Skip to main content

hypercall/messaging/
channel_event_bus.rs

1use async_trait::async_trait;
2use std::collections::HashMap;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::Arc;
5use tokio::sync::{mpsc, RwLock};
6use tracing::{debug, info};
7
8use super::event_bus_trait::{EventBusTrait, LegacyMessageEnvelope, MessageEnvelope, TopicOffsets};
9use hypercall_types::EngineMessage;
10
11/// In-process channel-based event bus.
12pub struct ChannelEventBus {
13    // Channel for sending events
14    sender: mpsc::UnboundedSender<EngineMessage>,
15    receiver: Arc<RwLock<Option<mpsc::UnboundedReceiver<EngineMessage>>>>,
16
17    // Subscribers - maps topics to channel senders
18    subscribers: Arc<RwLock<HashMap<String, Vec<mpsc::UnboundedSender<EngineMessage>>>>>,
19
20    // Subscribers that want envelope with legacy offset metadata
21    subscribers_with_offsets:
22        Arc<RwLock<HashMap<String, Vec<mpsc::UnboundedSender<LegacyMessageEnvelope>>>>>,
23
24    // Subscribers that want envelope with sequence metadata
25    subscribers_with_sequence:
26        Arc<RwLock<HashMap<String, Vec<mpsc::UnboundedSender<MessageEnvelope>>>>>,
27
28    /// Monotonic sequence counter, incremented on each event dispatch.
29    sequence: Arc<AtomicU64>,
30
31    /// When set, replay_from_offsets returns this error instead of Ok.
32    /// Used to test graceful handling of replay failures.
33    replay_error: Arc<RwLock<Option<String>>>,
34}
35
36/// Backwards-compatible alias.
37pub type MockEventBus = ChannelEventBus;
38
39impl ChannelEventBus {
40    pub fn new() -> Result<Self, String> {
41        let (sender, receiver) = mpsc::unbounded_channel();
42
43        Ok(Self {
44            sender,
45            receiver: Arc::new(RwLock::new(Some(receiver))),
46            subscribers: Arc::new(RwLock::new(HashMap::new())),
47            subscribers_with_offsets: Arc::new(RwLock::new(HashMap::new())),
48            subscribers_with_sequence: Arc::new(RwLock::new(HashMap::new())),
49            sequence: Arc::new(AtomicU64::new(0)),
50            replay_error: Arc::new(RwLock::new(None)),
51        })
52    }
53
54    /// Clear all subscribers, simulating a SIGKILL that severs the event bus pipeline.
55    /// After this call, events sent via `get_sender()` will still be accepted but
56    /// silently dropped (no subscriber receives them).
57    pub async fn clear_subscribers(&self) {
58        self.subscribers.write().await.clear();
59        self.subscribers_with_offsets.write().await.clear();
60        self.subscribers_with_sequence.write().await.clear();
61    }
62
63    /// Configure replay_from_offsets to return an error, simulating replay failures.
64    pub async fn set_replay_error(&self, error: Option<String>) {
65        *self.replay_error.write().await = error;
66    }
67
68    fn get_topic_for_event(event: &EngineMessage) -> &'static str {
69        use crate::shared::topics::*;
70        match event {
71            EngineMessage::OrderAction(_) => TOPIC_ORDER_ACTIONS,
72            EngineMessage::OrderUpdate(_) => TOPIC_ORDER_UPDATES,
73            EngineMessage::OrderInfo(_) => TOPIC_ORDER_INFOS,
74            EngineMessage::MarketAction(_) => TOPIC_MARKET_ACTIONS,
75            EngineMessage::MarketUpdate(_) => TOPIC_MARKET_UPDATES,
76            EngineMessage::OrderFilled { .. } => TOPIC_FILLS,
77            EngineMessage::OrderbookUpdated(_) => TOPIC_ORDERBOOK_UPDATES,
78            EngineMessage::L2Update(_) => TOPIC_L2_UPDATES,
79            EngineMessage::Trade(_) => TOPIC_TRADES,
80            EngineMessage::TransactionRequest(_) => TOPIC_TRANSACTION_REQUESTS,
81            EngineMessage::TransactionUpdate(_) => TOPIC_TRANSACTION_UPDATES,
82            EngineMessage::MmpTriggered(_) => TOPIC_MMP_TRIGGERS,
83            EngineMessage::PositionExpired(_) => TOPIC_POSITION_EXPIRED,
84            EngineMessage::TierUpdate(_) => TOPIC_TIER_UPDATES,
85            EngineMessage::HypercorePositionUpdate(_) => TOPIC_HYPERCORE_POSITION_UPDATES,
86            EngineMessage::LiquidationStateChange(_) => TOPIC_LIQUIDATION_STATE,
87            EngineMessage::RfqFilled(_) => TOPIC_RFQ_FILLS,
88        }
89    }
90}
91
92#[async_trait]
93impl EventBusTrait for ChannelEventBus {
94    fn get_sender(&self) -> mpsc::UnboundedSender<EngineMessage> {
95        self.sender.clone()
96    }
97
98    async fn subscribe(
99        &self,
100        topics: Vec<String>,
101    ) -> Result<mpsc::UnboundedReceiver<EngineMessage>, String> {
102        let (tx, rx) = mpsc::unbounded_channel();
103
104        let mut subscribers = self.subscribers.write().await;
105        for topic in &topics {
106            subscribers
107                .entry(topic.clone())
108                .or_insert_with(Vec::new)
109                .push(tx.clone());
110        }
111
112        Ok(rx)
113    }
114
115    async fn subscribe_with_offsets(
116        &self,
117        topics: Vec<String>,
118    ) -> Result<mpsc::UnboundedReceiver<LegacyMessageEnvelope>, String> {
119        let (tx, rx) = mpsc::unbounded_channel();
120
121        let mut subscribers = self.subscribers_with_offsets.write().await;
122        for topic in &topics {
123            subscribers
124                .entry(topic.clone())
125                .or_insert_with(Vec::new)
126                .push(tx.clone());
127        }
128
129        Ok(rx)
130    }
131
132    async fn subscribe_with_sequence(
133        &self,
134        topics: Vec<String>,
135    ) -> Result<mpsc::UnboundedReceiver<MessageEnvelope>, String> {
136        let (tx, rx) = mpsc::unbounded_channel();
137
138        let mut subscribers = self.subscribers_with_sequence.write().await;
139        for topic in &topics {
140            subscribers
141                .entry(topic.clone())
142                .or_insert_with(Vec::new)
143                .push(tx.clone());
144        }
145
146        Ok(rx)
147    }
148
149    async fn start_processing(self: Arc<Self>) {
150        // Take the receiver
151        let mut receiver = {
152            let mut lock = self.receiver.write().await;
153            lock.take().expect("start_processing called twice")
154        };
155
156        // Forward events from sender to subscribers
157        tokio::spawn(async move {
158            while let Some(event) = receiver.recv().await {
159                debug!("MockEventBus: Processing event {:?}", event);
160
161                // Forward to subscribers based on topic
162                let topic = Self::get_topic_for_event(&event);
163                let subscribers = self.subscribers.read().await;
164                if let Some(subs) = subscribers.get(topic) {
165                    for sub in subs {
166                        let _ = sub.send(event.clone());
167                    }
168                }
169                drop(subscribers);
170
171                // Assign monotonic sequence number
172                let seq = self.sequence.fetch_add(1, Ordering::Relaxed) + 1;
173
174                // Forward to legacy envelope subscribers (offsets always None for in-process bus)
175                let envelope_subscribers = self.subscribers_with_offsets.read().await;
176                if let Some(subs) = envelope_subscribers.get(topic) {
177                    let envelope = LegacyMessageEnvelope {
178                        message: event.clone(),
179                        partition: None,
180                        offset: None,
181                    };
182                    for sub in subs {
183                        let _ = sub.send(envelope.clone());
184                    }
185                }
186                drop(envelope_subscribers);
187
188                // Forward to sequence subscribers
189                let seq_subscribers = self.subscribers_with_sequence.read().await;
190                if let Some(subs) = seq_subscribers.get(topic) {
191                    let envelope = MessageEnvelope {
192                        message: event.clone(),
193                        sequence: seq,
194                    };
195                    for sub in subs {
196                        let _ = sub.send(envelope.clone());
197                    }
198                }
199            }
200        });
201    }
202
203    async fn replay_from_offsets(
204        &self,
205        offsets: TopicOffsets,
206        _handler: super::event_bus_trait::ReplayHandler,
207    ) -> Result<TopicOffsets, String> {
208        // Check if a simulated error is configured
209        if let Some(error) = self.replay_error.read().await.as_ref() {
210            info!(
211                "MockEventBus: replay_from_offsets returning simulated error: {}",
212                error
213            );
214            return Err(error.clone());
215        }
216
217        // Mock doesn't have persistent storage, so nothing to replay
218        // Return the same offsets as "final" since nothing was replayed
219        info!(
220            "MockEventBus: replay_from_offsets called with {} topics (no-op)",
221            offsets.len()
222        );
223        Ok(offsets)
224    }
225
226    fn get_current_offsets(&self) -> TopicOffsets {
227        // In-process event bus has no offsets to track
228        HashMap::new()
229    }
230
231    fn get_current_sequence(&self) -> u64 {
232        self.sequence.load(Ordering::Relaxed)
233    }
234}