Skip to main content

hypercall/messaging/
event_bus_trait.rs

1use async_trait::async_trait;
2use hypercall_types::EngineMessage;
3use std::collections::HashMap;
4use std::sync::Arc;
5use tokio::sync::mpsc;
6
7/// Offsets format: topic -> partition -> offset
8pub type TopicOffsets = HashMap<String, HashMap<i32, i64>>;
9
10/// Handler for replayed events (boxed for object safety)
11pub type ReplayHandler = Box<dyn Fn(EngineMessage) + Send + Sync>;
12
13/// Envelope wrapping a message with legacy partition/offset coordinates.
14/// Used by subscribers that need to track exactly which messages they've processed.
15#[derive(Debug, Clone)]
16pub struct LegacyMessageEnvelope {
17    pub message: EngineMessage,
18    /// Partition this message came from (None for in-process event bus)
19    pub partition: Option<i32>,
20    /// Offset of this message (None for in-process event bus)
21    pub offset: Option<i64>,
22}
23
24/// Envelope wrapping a message with a monotonic sequence number.
25/// Preferred over LegacyMessageEnvelope for consumers that need ordering.
26#[derive(Debug, Clone)]
27pub struct MessageEnvelope {
28    pub message: EngineMessage,
29    /// Monotonic sequence number assigned by the event bus on dispatch.
30    pub sequence: u64,
31}
32
33#[async_trait]
34pub trait EventBusTrait: Send + Sync {
35    /// Get a sender to publish events
36    fn get_sender(&self) -> mpsc::UnboundedSender<EngineMessage>;
37
38    /// Subscribe to specific topics and get a receiver for events
39    async fn subscribe(
40        &self,
41        topics: Vec<String>,
42    ) -> Result<mpsc::UnboundedReceiver<EngineMessage>, String>;
43
44    /// Subscribe to specific topics and get a receiver for events WITH legacy offset metadata.
45    /// Use this when you need to track exactly which offsets you've processed (for snapshots).
46    async fn subscribe_with_offsets(
47        &self,
48        topics: Vec<String>,
49    ) -> Result<mpsc::UnboundedReceiver<LegacyMessageEnvelope>, String>;
50
51    /// Subscribe to specific topics and get a receiver for events WITH sequence metadata.
52    /// Use this when you need monotonic ordering.
53    async fn subscribe_with_sequence(
54        &self,
55        topics: Vec<String>,
56    ) -> Result<mpsc::UnboundedReceiver<MessageEnvelope>, String>;
57
58    /// Start processing events (forwarding between channels)
59    async fn start_processing(self: Arc<Self>);
60
61    /// Replay events from specific offsets until caught up to the current head.
62    ///
63    /// This is used during startup to replay events from the last snapshot
64    /// to the current state before marking the service as ready.
65    ///
66    /// Returns Ok(final_offsets) when caught up, where final_offsets are the
67    /// high water marks that were reached. Returns Err if replay fails.
68    /// The handler is called for each replayed message.
69    async fn replay_from_offsets(
70        &self,
71        offsets: TopicOffsets,
72        handler: ReplayHandler,
73    ) -> Result<TopicOffsets, String>;
74
75    /// Get the current consumer offsets for all topics.
76    ///
77    /// Returns the latest offset processed for each topic/partition.
78    /// Used by snapshot services to record the exact position in the event stream
79    /// that corresponds to the snapshot state.
80    ///
81    /// For in-process implementations, returns an empty map.
82    fn get_current_offsets(&self) -> TopicOffsets;
83
84    /// Get the current sequence number (highest dispatched).
85    fn get_current_sequence(&self) -> u64;
86}