hypercall_competition/recorder.rs
1//! Downstream consumer that records completed fills into the competition tables.
2//!
3//! This is the ingestion side of competitions, decoupled from the websocket
4//! broadcast path. The recorder consumes a stream of [`EngineMessage`] and, for
5//! each `OrderFilled`, persists a competition fill.
6//!
7//! ## Source-agnostic by design
8//!
9//! `run` takes a plain `mpsc::UnboundedReceiver<EngineMessage>`, so it does not
10//! care where the fill stream comes from. Today the runtime feeds it from the
11//! in-process engine EventBus (`TOPIC_FILLS` subscription). When the engine runs
12//! out-of-process (see the transport-evolution roadmap), the same recorder is fed
13//! by a NATS JetStream consumer instead, with no change here. Other downstream
14//! services (analytics, auditors) subscribe to the same stream the same way.
15//!
16//! ## Failure handling
17//!
18//! A competition write failure is logged and metered but does NOT crash the
19//! engine node. The previous implementation recorded fills inline in the
20//! websocket forwarder and triggered a full process shutdown on failure, so a
21//! transient leaderboard DB error could take down trading. Fill recording is
22//! idempotent (`record_competition_fill` is `ON CONFLICT (trade_id, wallet) DO
23//! NOTHING`), so a dropped fill is safely re-recordable.
24//!
25//! ## Durability caveat (not yet a durable follower)
26//!
27//! This consumes an in-memory event stream. Fills already written to
28//! `competition_fill_events` survive restart, but a fill emitted but not yet
29//! recorded when the node dies is NOT recovered: engine replay rebuilds engine
30//! state without re-emitting fills to the event bus. Closing that crash window
31//! requires making competition a durable external-follower projection that
32//! replays the fill stream from the journal/NATS at a recorded offset (tracked
33//! separately).
34
35use std::sync::Arc;
36
37use async_trait::async_trait;
38use hypercall_runtime_api::ShutdownRx;
39use hypercall_types::{EngineMessage, Fill};
40use tokio::sync::mpsc::UnboundedReceiver;
41
42use crate::CompetitionService;
43
44/// Minimal sink the recorder depends on: persist one competition fill.
45///
46/// Implemented by [`CompetitionService`]; abstracted so the recorder's only
47/// dependency is "record a fill", which keeps it decoupled and unit-testable.
48#[async_trait]
49pub trait CompetitionFillSink: Send + Sync {
50 async fn record_fill(&self, fill: &Fill) -> anyhow::Result<()>;
51}
52
53#[async_trait]
54impl CompetitionFillSink for CompetitionService {
55 async fn record_fill(&self, fill: &Fill) -> anyhow::Result<()> {
56 CompetitionService::record_fill(self, fill)
57 .await
58 .map_err(anyhow::Error::new)
59 }
60}
61
62/// Consumes the engine fill stream and records competition fills.
63pub struct CompetitionFillRecorder {
64 sink: Arc<dyn CompetitionFillSink>,
65}
66
67impl CompetitionFillRecorder {
68 pub fn new(sink: Arc<dyn CompetitionFillSink>) -> Self {
69 Self { sink }
70 }
71
72 /// Record every `OrderFilled` until the event stream closes or shutdown fires.
73 ///
74 /// Intended to be spawned in the runtime task group. Returns when either the
75 /// sender side of `events` is dropped or the `shutdown` signal is triggered.
76 /// Selecting on `shutdown` is required: the event-bus subscription is not
77 /// closed during graceful shutdown, so without it the task group would block
78 /// on this task until its shutdown timeout elapsed.
79 pub async fn run(self, mut events: UnboundedReceiver<EngineMessage>, mut shutdown: ShutdownRx) {
80 tracing::info!("CompetitionFillRecorder started");
81 loop {
82 let event = tokio::select! {
83 _ = shutdown.recv() => {
84 tracing::info!("CompetitionFillRecorder received shutdown signal");
85 break;
86 }
87 maybe_event = events.recv() => match maybe_event {
88 Some(event) => event,
89 None => {
90 tracing::info!(
91 "CompetitionFillRecorder stopped: engine event stream closed"
92 );
93 break;
94 }
95 },
96 };
97 let EngineMessage::OrderFilled { fill, .. } = &event else {
98 continue;
99 };
100 match self.sink.record_fill(fill).await {
101 Ok(()) => {
102 metrics::counter!("competition_fills_recorded_total").increment(1);
103 }
104 Err(error) => {
105 tracing::error!(
106 trade_id = fill.trade_id,
107 %error,
108 "failed to record competition fill; continuing (recording is idempotent)"
109 );
110 metrics::counter!("competition_fill_record_failures_total").increment(1);
111 }
112 }
113 }
114 }
115}
116
117#[cfg(test)]
118mod tests {
119 use super::*;
120 use hypercall_runtime_api::Shutdown;
121 use hypercall_types::{FillAccounting, FillSource, Side, WalletAddress};
122 use rust_decimal_macros::dec;
123 use std::sync::atomic::{AtomicU64, Ordering};
124 use tokio::sync::mpsc;
125
126 struct MockSink {
127 recorded: AtomicU64,
128 fail: bool,
129 }
130
131 impl MockSink {
132 fn new(fail: bool) -> Arc<Self> {
133 Arc::new(Self {
134 recorded: AtomicU64::new(0),
135 fail,
136 })
137 }
138 fn count(&self) -> u64 {
139 self.recorded.load(Ordering::SeqCst)
140 }
141 }
142
143 #[async_trait]
144 impl CompetitionFillSink for MockSink {
145 async fn record_fill(&self, _fill: &Fill) -> anyhow::Result<()> {
146 self.recorded.fetch_add(1, Ordering::SeqCst);
147 if self.fail {
148 anyhow::bail!("simulated competition DB failure");
149 }
150 Ok(())
151 }
152 }
153
154 fn order_filled(trade_id: u64) -> EngineMessage {
155 EngineMessage::OrderFilled {
156 fill: Fill {
157 trade_id,
158 taker_order_id: 1,
159 maker_order_id: 2,
160 symbol: "BTC-20260331-100000-C".to_string(),
161 price: dec!(100),
162 size: dec!(1),
163 taker_side: Side::Buy,
164 taker_wallet_address: WalletAddress::from([1u8; 20]),
165 maker_wallet_address: WalletAddress::from([2u8; 20]),
166 fee: dec!(0),
167 is_taker: true,
168 timestamp: 0,
169 builder_code_address: None,
170 builder_code_fee: None,
171 source: FillSource::default(),
172 taker_realized_pnl: None,
173 maker_realized_pnl: None,
174 underlying_notional: None,
175 },
176 accounting: FillAccounting::default(),
177 }
178 }
179
180 #[tokio::test]
181 async fn records_each_order_filled() {
182 let sink = MockSink::new(false);
183 let (tx, rx) = mpsc::unbounded_channel();
184 tx.send(order_filled(1)).unwrap();
185 tx.send(order_filled(2)).unwrap();
186 tx.send(order_filled(3)).unwrap();
187 drop(tx); // close the stream so run() returns
188
189 let shutdown = Shutdown::new();
190 CompetitionFillRecorder::new(sink.clone())
191 .run(rx, shutdown.subscribe())
192 .await;
193
194 assert_eq!(sink.count(), 3, "every OrderFilled should be recorded");
195 }
196
197 #[tokio::test]
198 async fn record_failure_does_not_stop_the_loop() {
199 // The whole point of the refactor: a competition DB failure must not
200 // crash or stall the recorder; every subsequent fill is still attempted.
201 let sink = MockSink::new(true);
202 let (tx, rx) = mpsc::unbounded_channel();
203 tx.send(order_filled(1)).unwrap();
204 tx.send(order_filled(2)).unwrap();
205 drop(tx);
206
207 // Must complete without panicking despite every record_fill erroring.
208 let shutdown = Shutdown::new();
209 CompetitionFillRecorder::new(sink.clone())
210 .run(rx, shutdown.subscribe())
211 .await;
212
213 assert_eq!(
214 sink.count(),
215 2,
216 "both fills attempted even though recording errored"
217 );
218 }
219
220 #[tokio::test]
221 async fn shutdown_signal_stops_the_loop_with_stream_open() {
222 // Regression: the runtime spawns this in a task group and never closes
223 // the event-bus subscription on shutdown, so `run` must return when the
224 // shutdown signal fires even though `tx` is still alive. Without that,
225 // graceful shutdown blocks until the task group's timeout (the failure
226 // mode that took down the heavy E2E shutdown assertion).
227 let sink = MockSink::new(false);
228 let (tx, rx) = mpsc::unbounded_channel();
229 let shutdown = Shutdown::new();
230 let handle =
231 tokio::spawn(CompetitionFillRecorder::new(sink.clone()).run(rx, shutdown.subscribe()));
232
233 shutdown.trigger();
234
235 tokio::time::timeout(std::time::Duration::from_secs(5), handle)
236 .await
237 .expect("recorder did not exit on shutdown signal")
238 .expect("recorder task panicked");
239
240 // tx is still open here: the loop exited via shutdown, not stream close.
241 drop(tx);
242 }
243}