hypercall/messaging/event_bus_trait.rs
1use async_trait::async_trait;
2use hypercall_types::EngineMessage;
3use std::collections::HashMap;
4use std::sync::Arc;
5use tokio::sync::mpsc;
6
7/// Offsets format: topic -> partition -> offset
8pub type TopicOffsets = HashMap<String, HashMap<i32, i64>>;
9
10/// Handler for replayed events (boxed for object safety)
11pub type ReplayHandler = Box<dyn Fn(EngineMessage) + Send + Sync>;
12
13/// Envelope wrapping a message with legacy partition/offset coordinates.
14/// Used by subscribers that need to track exactly which messages they've processed.
15#[derive(Debug, Clone)]
16pub struct LegacyMessageEnvelope {
17 pub message: EngineMessage,
18 /// Partition this message came from (None for in-process event bus)
19 pub partition: Option<i32>,
20 /// Offset of this message (None for in-process event bus)
21 pub offset: Option<i64>,
22}
23
24/// Envelope wrapping a message with a monotonic sequence number.
25/// Preferred over LegacyMessageEnvelope for consumers that need ordering.
26#[derive(Debug, Clone)]
27pub struct MessageEnvelope {
28 pub message: EngineMessage,
29 /// Monotonic sequence number assigned by the event bus on dispatch.
30 pub sequence: u64,
31}
32
33#[async_trait]
34pub trait EventBusTrait: Send + Sync {
35 /// Get a sender to publish events
36 fn get_sender(&self) -> mpsc::UnboundedSender<EngineMessage>;
37
38 /// Subscribe to specific topics and get a receiver for events
39 async fn subscribe(
40 &self,
41 topics: Vec<String>,
42 ) -> Result<mpsc::UnboundedReceiver<EngineMessage>, String>;
43
44 /// Subscribe to specific topics and get a receiver for events WITH legacy offset metadata.
45 /// Use this when you need to track exactly which offsets you've processed (for snapshots).
46 async fn subscribe_with_offsets(
47 &self,
48 topics: Vec<String>,
49 ) -> Result<mpsc::UnboundedReceiver<LegacyMessageEnvelope>, String>;
50
51 /// Subscribe to specific topics and get a receiver for events WITH sequence metadata.
52 /// Use this when you need monotonic ordering.
53 async fn subscribe_with_sequence(
54 &self,
55 topics: Vec<String>,
56 ) -> Result<mpsc::UnboundedReceiver<MessageEnvelope>, String>;
57
58 /// Start processing events (forwarding between channels)
59 async fn start_processing(self: Arc<Self>);
60
61 /// Replay events from specific offsets until caught up to the current head.
62 ///
63 /// This is used during startup to replay events from the last snapshot
64 /// to the current state before marking the service as ready.
65 ///
66 /// Returns Ok(final_offsets) when caught up, where final_offsets are the
67 /// high water marks that were reached. Returns Err if replay fails.
68 /// The handler is called for each replayed message.
69 async fn replay_from_offsets(
70 &self,
71 offsets: TopicOffsets,
72 handler: ReplayHandler,
73 ) -> Result<TopicOffsets, String>;
74
75 /// Get the current consumer offsets for all topics.
76 ///
77 /// Returns the latest offset processed for each topic/partition.
78 /// Used by snapshot services to record the exact position in the event stream
79 /// that corresponds to the snapshot state.
80 ///
81 /// For in-process implementations, returns an empty map.
82 fn get_current_offsets(&self) -> TopicOffsets;
83
84 /// Get the current sequence number (highest dispatched).
85 fn get_current_sequence(&self) -> u64;
86}