Skip to main content

hypercall_competition/
recorder.rs

1//! Downstream consumer that records completed fills into the competition tables.
2//!
3//! This is the ingestion side of competitions, decoupled from the websocket
4//! broadcast path. The recorder consumes a stream of [`EngineMessage`] and, for
5//! each `OrderFilled`, persists a competition fill.
6//!
7//! ## Source-agnostic by design
8//!
9//! `run` takes a plain `mpsc::UnboundedReceiver<EngineMessage>`, so it does not
10//! care where the fill stream comes from. Today the runtime feeds it from the
11//! in-process engine EventBus (`TOPIC_FILLS` subscription). When the engine runs
12//! out-of-process (see the transport-evolution roadmap), the same recorder is fed
13//! by a NATS JetStream consumer instead, with no change here. Other downstream
14//! services (analytics, auditors) subscribe to the same stream the same way.
15//!
16//! ## Failure handling
17//!
18//! A competition write failure is logged and metered but does NOT crash the
19//! engine node. The previous implementation recorded fills inline in the
20//! websocket forwarder and triggered a full process shutdown on failure, so a
21//! transient leaderboard DB error could take down trading. Fill recording is
22//! idempotent (`record_competition_fill` is `ON CONFLICT (trade_id, wallet) DO
23//! NOTHING`), so a dropped fill is safely re-recordable.
24//!
25//! ## Durability caveat (not yet a durable follower)
26//!
27//! This consumes an in-memory event stream. Fills already written to
28//! `competition_fill_events` survive restart, but a fill emitted but not yet
29//! recorded when the node dies is NOT recovered: engine replay rebuilds engine
30//! state without re-emitting fills to the event bus. Closing that crash window
31//! requires making competition a durable external-follower projection that
32//! replays the fill stream from the journal/NATS at a recorded offset (tracked
33//! separately).
34
35use std::sync::Arc;
36
37use async_trait::async_trait;
38use hypercall_runtime_api::ShutdownRx;
39use hypercall_types::{EngineMessage, Fill};
40use tokio::sync::mpsc::UnboundedReceiver;
41
42use crate::CompetitionService;
43
44/// Minimal sink the recorder depends on: persist one competition fill.
45///
46/// Implemented by [`CompetitionService`]; abstracted so the recorder's only
47/// dependency is "record a fill", which keeps it decoupled and unit-testable.
48#[async_trait]
49pub trait CompetitionFillSink: Send + Sync {
50    async fn record_fill(&self, fill: &Fill) -> anyhow::Result<()>;
51}
52
53#[async_trait]
54impl CompetitionFillSink for CompetitionService {
55    async fn record_fill(&self, fill: &Fill) -> anyhow::Result<()> {
56        CompetitionService::record_fill(self, fill)
57            .await
58            .map_err(anyhow::Error::new)
59    }
60}
61
62/// Consumes the engine fill stream and records competition fills.
63pub struct CompetitionFillRecorder {
64    sink: Arc<dyn CompetitionFillSink>,
65}
66
67impl CompetitionFillRecorder {
68    pub fn new(sink: Arc<dyn CompetitionFillSink>) -> Self {
69        Self { sink }
70    }
71
72    /// Record every `OrderFilled` until the event stream closes or shutdown fires.
73    ///
74    /// Intended to be spawned in the runtime task group. Returns when either the
75    /// sender side of `events` is dropped or the `shutdown` signal is triggered.
76    /// Selecting on `shutdown` is required: the event-bus subscription is not
77    /// closed during graceful shutdown, so without it the task group would block
78    /// on this task until its shutdown timeout elapsed.
79    pub async fn run(self, mut events: UnboundedReceiver<EngineMessage>, mut shutdown: ShutdownRx) {
80        tracing::info!("CompetitionFillRecorder started");
81        loop {
82            let event = tokio::select! {
83                _ = shutdown.recv() => {
84                    tracing::info!("CompetitionFillRecorder received shutdown signal");
85                    break;
86                }
87                maybe_event = events.recv() => match maybe_event {
88                    Some(event) => event,
89                    None => {
90                        tracing::info!(
91                            "CompetitionFillRecorder stopped: engine event stream closed"
92                        );
93                        break;
94                    }
95                },
96            };
97            let EngineMessage::OrderFilled { fill, .. } = &event else {
98                continue;
99            };
100            match self.sink.record_fill(fill).await {
101                Ok(()) => {
102                    metrics::counter!("competition_fills_recorded_total").increment(1);
103                }
104                Err(error) => {
105                    tracing::error!(
106                        trade_id = fill.trade_id,
107                        %error,
108                        "failed to record competition fill; continuing (recording is idempotent)"
109                    );
110                    metrics::counter!("competition_fill_record_failures_total").increment(1);
111                }
112            }
113        }
114    }
115}
116
117#[cfg(test)]
118mod tests {
119    use super::*;
120    use hypercall_runtime_api::Shutdown;
121    use hypercall_types::{FillAccounting, FillSource, Side, WalletAddress};
122    use rust_decimal_macros::dec;
123    use std::sync::atomic::{AtomicU64, Ordering};
124    use tokio::sync::mpsc;
125
126    struct MockSink {
127        recorded: AtomicU64,
128        fail: bool,
129    }
130
131    impl MockSink {
132        fn new(fail: bool) -> Arc<Self> {
133            Arc::new(Self {
134                recorded: AtomicU64::new(0),
135                fail,
136            })
137        }
138        fn count(&self) -> u64 {
139            self.recorded.load(Ordering::SeqCst)
140        }
141    }
142
143    #[async_trait]
144    impl CompetitionFillSink for MockSink {
145        async fn record_fill(&self, _fill: &Fill) -> anyhow::Result<()> {
146            self.recorded.fetch_add(1, Ordering::SeqCst);
147            if self.fail {
148                anyhow::bail!("simulated competition DB failure");
149            }
150            Ok(())
151        }
152    }
153
154    fn order_filled(trade_id: u64) -> EngineMessage {
155        EngineMessage::OrderFilled {
156            fill: Fill {
157                trade_id,
158                taker_order_id: 1,
159                maker_order_id: 2,
160                symbol: "BTC-20260331-100000-C".to_string(),
161                price: dec!(100),
162                size: dec!(1),
163                taker_side: Side::Buy,
164                taker_wallet_address: WalletAddress::from([1u8; 20]),
165                maker_wallet_address: WalletAddress::from([2u8; 20]),
166                fee: dec!(0),
167                is_taker: true,
168                timestamp: 0,
169                builder_code_address: None,
170                builder_code_fee: None,
171                source: FillSource::default(),
172                taker_realized_pnl: None,
173                maker_realized_pnl: None,
174                underlying_notional: None,
175            },
176            accounting: FillAccounting::default(),
177        }
178    }
179
180    #[tokio::test]
181    async fn records_each_order_filled() {
182        let sink = MockSink::new(false);
183        let (tx, rx) = mpsc::unbounded_channel();
184        tx.send(order_filled(1)).unwrap();
185        tx.send(order_filled(2)).unwrap();
186        tx.send(order_filled(3)).unwrap();
187        drop(tx); // close the stream so run() returns
188
189        let shutdown = Shutdown::new();
190        CompetitionFillRecorder::new(sink.clone())
191            .run(rx, shutdown.subscribe())
192            .await;
193
194        assert_eq!(sink.count(), 3, "every OrderFilled should be recorded");
195    }
196
197    #[tokio::test]
198    async fn record_failure_does_not_stop_the_loop() {
199        // The whole point of the refactor: a competition DB failure must not
200        // crash or stall the recorder; every subsequent fill is still attempted.
201        let sink = MockSink::new(true);
202        let (tx, rx) = mpsc::unbounded_channel();
203        tx.send(order_filled(1)).unwrap();
204        tx.send(order_filled(2)).unwrap();
205        drop(tx);
206
207        // Must complete without panicking despite every record_fill erroring.
208        let shutdown = Shutdown::new();
209        CompetitionFillRecorder::new(sink.clone())
210            .run(rx, shutdown.subscribe())
211            .await;
212
213        assert_eq!(
214            sink.count(),
215            2,
216            "both fills attempted even though recording errored"
217        );
218    }
219
220    #[tokio::test]
221    async fn shutdown_signal_stops_the_loop_with_stream_open() {
222        // Regression: the runtime spawns this in a task group and never closes
223        // the event-bus subscription on shutdown, so `run` must return when the
224        // shutdown signal fires even though `tx` is still alive. Without that,
225        // graceful shutdown blocks until the task group's timeout (the failure
226        // mode that took down the heavy E2E shutdown assertion).
227        let sink = MockSink::new(false);
228        let (tx, rx) = mpsc::unbounded_channel();
229        let shutdown = Shutdown::new();
230        let handle =
231            tokio::spawn(CompetitionFillRecorder::new(sink.clone()).run(rx, shutdown.subscribe()));
232
233        shutdown.trigger();
234
235        tokio::time::timeout(std::time::Duration::from_secs(5), handle)
236            .await
237            .expect("recorder did not exit on shutdown signal")
238            .expect("recorder task panicked");
239
240        // tx is still open here: the loop exited via shutdown, not stream close.
241        drop(tx);
242    }
243}