Skip to main content

hypercall_api/
notification_service.rs

1//! Persisted per-user notification feed.
2//!
3//! Postgres is the source of truth. Redis (Upstash, globally distributed)
4//! holds an edge-cached `has_unread:{wallet}` bit so SSR / SSR-like reads
5//! don't have to cross the Pacific to Tokyo-pg for the bell dot.
6//!
7//! Web Push delivery (handled by `PushNotificationService`) is a separate
8//! display channel on top of this log: if the user is subscribed they get
9//! an OS notification too, but the feed stays complete either way.
10//!
11//! **Graceful degradation:** the service stays fully functional when Redis
12//! is unavailable -- pg writes/reads proceed normally; the bell cache just
13//! doesn't update and SSR readers fall back to no-cached-bit behavior.
14//! Publishing a fill should never fail because of Redis.
15//!
16//! **Backpressure:** callers on the fill hot path use [`enqueue_publish`],
17//! which `try_send`s onto a bounded mpsc. A single background drainer
18//! awaits inserts with bounded concurrency. Burst loads that can't keep up
19//! drop the tail (counted via a `warn!` log) rather than spawning an
20//! unbounded pile of detached `tokio::spawn` tasks.
21//!
22//! **Has-unread bit race:** `mark_*_read` clears the Redis bit *after*
23//! checking pg is empty, and then re-checks pg one more time to catch a
24//! concurrent `publish` that raced in during the clear. Not strictly
25//! atomic (would need a Lua script with an out-of-band pg check), but
26//! closes the common race window where a publisher lands between our pg
27//! check and our DEL.
28//!
29//! **No in-process dedupe cache on the has_unread bit.** An earlier
30//! revision kept a `DashSet<wallet>` to skip re-`SET`s. That cache isn't
31//! invalidated across instances after a mark-read (instance B clears
32//! Redis, instance A keeps suppressing), so correctness beats the
33//! optimization: `SET k 1` is cheap and idempotent.
34//!
35//! Payloads are stored as msgpack bytes: we never query into them, and the
36//! encoder is cheaper + more compact than JSON for the volumes we expect.
37
38use anyhow::{Context, Result};
39use chrono::Utc;
40use redis::aio::ConnectionManager;
41use redis::AsyncCommands;
42use std::sync::atomic::{AtomicU64, Ordering};
43use std::sync::Arc;
44use std::time::Duration;
45use tokio::sync::{broadcast, mpsc, Mutex};
46use tracing::{debug, info, warn};
47
48use hypercall_db::{NewNotificationInput, NotificationRecord, NotificationWriter};
49
50const HAS_UNREAD_KEY_PREFIX: &str = "notifications:has_unread:";
51const DEFAULT_LIST_LIMIT: i64 = 50;
52const MAX_LIST_LIMIT: i64 = 100;
53const REDIS_COMMAND_TIMEOUT: Duration = Duration::from_secs(2);
54/// After a Redis connect failure we short-circuit subsequent calls for this
55/// long so a transient outage doesn't burn 2s per publish waiting on the
56/// timeout. One block per wallet-side call is cheap; cascading retries
57/// back the drainer into a queue-full state.
58const REDIS_FAILURE_BACKOFF: Duration = Duration::from_secs(30);
59/// Bounded channel capacity for fire-and-forget publishes. If the drainer
60/// can't keep up, new publishes are dropped with a `warn!` (best-effort
61/// delivery, matching `PushNotificationService`).
62const PUBLISH_QUEUE_CAPACITY: usize = 1_000;
63/// Max time we give the drainer to flush pending publishes once the
64/// shutdown signal fires. After this the drainer exits even if the queue
65/// still has items.
66const SHUTDOWN_DRAIN_TIMEOUT: Duration = Duration::from_secs(5);
67
68fn now_ms() -> u64 {
69    std::time::SystemTime::now()
70        .duration_since(std::time::UNIX_EPOCH)
71        .map(|d| d.as_millis() as u64)
72        .unwrap_or(0)
73}
74
75fn has_unread_key(wallet_lower: &str) -> String {
76    format!("{HAS_UNREAD_KEY_PREFIX}{wallet_lower}")
77}
78
79struct PublishJob {
80    wallet_lower: String,
81    notification_type: String,
82    payload: Vec<u8>,
83}
84
85pub struct NotificationService {
86    db: Arc<dyn NotificationWriter>,
87    /// Lazily-established Redis connection. `None` when the service was
88    /// constructed without a Redis client -- cache ops become no-ops.
89    redis_client: Option<redis::Client>,
90    redis_conn: Arc<Mutex<Option<ConnectionManager>>>,
91    /// ms timestamp of the last Redis connect failure; `0` means "no
92    /// recent failure, try again". Atomic so every path can read without
93    /// taking the Mutex.
94    redis_last_failure_ms: Arc<AtomicU64>,
95    publish_tx: mpsc::Sender<PublishJob>,
96}
97
98impl NotificationService {
99    pub fn new(
100        db: Arc<dyn NotificationWriter>,
101        redis_client: Option<redis::Client>,
102        shutdown_rx: broadcast::Receiver<()>,
103    ) -> Arc<Self> {
104        let (publish_tx, publish_rx) = mpsc::channel::<PublishJob>(PUBLISH_QUEUE_CAPACITY);
105        let svc = Arc::new(Self {
106            db,
107            redis_client,
108            redis_conn: Arc::new(Mutex::new(None)),
109            redis_last_failure_ms: Arc::new(AtomicU64::new(0)),
110            publish_tx,
111        });
112        let drainer = svc.clone();
113        tokio::spawn(drain_publish_queue(drainer, publish_rx, shutdown_rx));
114        svc
115    }
116
117    /// Are we inside the "recently failed" backoff window?
118    fn in_redis_backoff(&self) -> bool {
119        let last = self.redis_last_failure_ms.load(Ordering::Relaxed);
120        if last == 0 {
121            return false;
122        }
123        now_ms().saturating_sub(last) < REDIS_FAILURE_BACKOFF.as_millis() as u64
124    }
125
126    fn mark_redis_failure(&self) {
127        self.redis_last_failure_ms
128            .store(now_ms(), Ordering::Relaxed);
129    }
130
131    fn clear_redis_failure(&self) {
132        self.redis_last_failure_ms.store(0, Ordering::Relaxed);
133    }
134
135    async fn conn(&self) -> Option<ConnectionManager> {
136        let client = self.redis_client.as_ref()?;
137        // Fail fast while we're still inside the backoff window from a
138        // recent connect failure -- otherwise a Redis outage would stall
139        // every publish 2s at the ConnectionManager::new timeout, which
140        // in turn blocks the drainer and fills up the publish queue.
141        if self.in_redis_backoff() {
142            return None;
143        }
144        let mut guard = self.redis_conn.lock().await;
145        if let Some(existing) = guard.as_ref() {
146            return Some(existing.clone());
147        }
148        let conn = match tokio::time::timeout(
149            REDIS_COMMAND_TIMEOUT,
150            ConnectionManager::new(client.clone()),
151        )
152        .await
153        {
154            Ok(Ok(c)) => c,
155            Ok(Err(e)) => {
156                self.mark_redis_failure();
157                warn!(error = %e, "notifications: failed to open Redis ConnectionManager; bell cache disabled, backing off");
158                return None;
159            }
160            Err(_) => {
161                self.mark_redis_failure();
162                warn!("notifications: timed out opening Redis ConnectionManager; bell cache disabled, backing off");
163                return None;
164            }
165        };
166        *guard = Some(conn.clone());
167        self.clear_redis_failure();
168        Some(conn)
169    }
170
171    async fn set_has_unread(&self, wallet_lower: &str) {
172        let Some(mut conn) = self.conn().await else {
173            return;
174        };
175        let key = has_unread_key(wallet_lower);
176        let res: redis::RedisResult<()> =
177            tokio::time::timeout(REDIS_COMMAND_TIMEOUT, conn.set(&key, 1i32))
178                .await
179                .unwrap_or_else(|_| {
180                    Err(redis::RedisError::from((
181                        redis::ErrorKind::IoError,
182                        "timeout",
183                    )))
184                });
185        match res {
186            Ok(()) => debug!(wallet = wallet_lower, "notifications: has_unread bit set"),
187            Err(e) => {
188                warn!(error = %e, wallet = wallet_lower, "notifications: has_unread SET failed")
189            }
190        }
191    }
192
193    async fn clear_has_unread(&self, wallet_lower: &str) {
194        let Some(mut conn) = self.conn().await else {
195            return;
196        };
197        let key = has_unread_key(wallet_lower);
198        let res: redis::RedisResult<()> =
199            tokio::time::timeout(REDIS_COMMAND_TIMEOUT, conn.del(&key))
200                .await
201                .unwrap_or_else(|_| {
202                    Err(redis::RedisError::from((
203                        redis::ErrorKind::IoError,
204                        "timeout",
205                    )))
206                });
207        if let Err(e) = res {
208            warn!(error = %e, wallet = wallet_lower, "notifications: has_unread DEL failed");
209        }
210    }
211
212    /// After a mark-read batch clears the Redis bit, a concurrent `publish`
213    /// may have landed between our pg `count_unread` and our Redis `DEL`.
214    /// Re-check pg; if pg now has unread, re-`SET` so we don't leave the
215    /// bell stuck at cleared while rows are actually unread.
216    async fn reconcile_has_unread_after_clear(&self, wallet_lower: &str) {
217        match self.count_unread(wallet_lower).await {
218            Ok(0) => {}
219            Ok(_) => {
220                self.set_has_unread(wallet_lower).await;
221            }
222            Err(e) => {
223                warn!(error = %e, wallet = wallet_lower, "notifications: post-clear reconcile count_unread failed");
224            }
225        }
226    }
227
228    /// Fire-and-forget publish: enqueue onto the bounded channel. Returns
229    /// `Err` only if the channel is full (drainer can't keep up). The
230    /// caller is expected to warn + drop rather than block the trading
231    /// event pipeline.
232    pub fn enqueue_publish(
233        &self,
234        wallet: &str,
235        notification_type: &str,
236        payload_msgpack: Vec<u8>,
237    ) -> Result<(), ()> {
238        let job = PublishJob {
239            wallet_lower: wallet.to_lowercase(),
240            notification_type: notification_type.to_string(),
241            payload: payload_msgpack,
242        };
243        self.publish_tx.try_send(job).map_err(|e| {
244            warn!(error = %e, "notifications: publish queue full; dropping event");
245        })
246    }
247
248    /// Synchronous publish -- insert a notification and update the Redis
249    /// bit. Prefer [`enqueue_publish`] on the fill hot path; this method
250    /// is mainly for tests and paths where the caller genuinely wants to
251    /// wait on the insert.
252    pub async fn publish(
253        &self,
254        wallet: &str,
255        notification_type: &str,
256        payload_msgpack: Vec<u8>,
257    ) -> Result<NotificationRecord> {
258        let wallet_lower = wallet.to_lowercase();
259
260        let row = self
261            .db
262            .insert_notification(NewNotificationInput {
263                wallet_address: wallet_lower.clone(),
264                notification_type: notification_type.to_string(),
265                payload: payload_msgpack,
266            })
267            .await
268            .context("Failed to insert notification")?;
269
270        // Best-effort cache update -- never propagate Redis failures back
271        // up to the caller; the pg insert is the source of truth.
272        self.set_has_unread(&wallet_lower).await;
273
274        Ok(row)
275    }
276
277    pub async fn list(
278        &self,
279        wallet: &str,
280        before_id: Option<i64>,
281        limit: Option<i64>,
282    ) -> Result<Vec<NotificationRecord>> {
283        let effective_limit = limit.unwrap_or(DEFAULT_LIST_LIMIT).clamp(1, MAX_LIST_LIMIT);
284        let wallet_lower = wallet.to_lowercase();
285
286        self.db
287            .list_notifications(&wallet_lower, before_id, effective_limit)
288            .await
289            .context("Failed to list notifications")
290    }
291
292    pub async fn count_unread(&self, wallet: &str) -> Result<i64> {
293        let wallet_lower = wallet.to_lowercase();
294        self.db
295            .count_unread_notifications(&wallet_lower)
296            .await
297            .context("Failed to count unread notifications")
298    }
299
300    pub async fn mark_read(&self, wallet: &str, ids: &[i64]) -> Result<usize> {
301        if ids.is_empty() {
302            return Ok(0);
303        }
304        let wallet_lower = wallet.to_lowercase();
305
306        let updated = self
307            .db
308            .mark_notifications_read(&wallet_lower, ids, Utc::now())
309            .await
310            .context("Failed to mark notifications read")?;
311
312        if updated > 0 {
313            if let Ok(remaining) = self.count_unread(&wallet_lower).await {
314                if remaining == 0 {
315                    self.clear_has_unread(&wallet_lower).await;
316                    self.reconcile_has_unread_after_clear(&wallet_lower).await;
317                }
318            }
319        }
320        Ok(updated)
321    }
322
323    pub async fn mark_all_read(&self, wallet: &str) -> Result<usize> {
324        let wallet_lower = wallet.to_lowercase();
325
326        let updated = self
327            .db
328            .mark_all_notifications_read(&wallet_lower, Utc::now())
329            .await
330            .context("Failed to mark all notifications read")?;
331
332        self.clear_has_unread(&wallet_lower).await;
333        self.reconcile_has_unread_after_clear(&wallet_lower).await;
334        Ok(updated)
335    }
336}
337
338async fn drain_publish_queue(
339    svc: Arc<NotificationService>,
340    mut rx: mpsc::Receiver<PublishJob>,
341    mut shutdown_rx: broadcast::Receiver<()>,
342) {
343    loop {
344        tokio::select! {
345            maybe_job = rx.recv() => {
346                let Some(job) = maybe_job else { break };
347                drain_one(&svc, job).await;
348            }
349            _ = shutdown_rx.recv() => {
350                info!("notifications: shutdown signal received; flushing pending publishes");
351                // Try to flush everything already enqueued up to a bounded
352                // deadline, then exit.
353                let deadline = tokio::time::Instant::now() + SHUTDOWN_DRAIN_TIMEOUT;
354                loop {
355                    match tokio::time::timeout_at(deadline, rx.recv()).await {
356                        Ok(Some(job)) => drain_one(&svc, job).await,
357                        Ok(None) => break,
358                        Err(_) => {
359                            let remaining = rx.len();
360                            if remaining > 0 {
361                                warn!(
362                                    remaining,
363                                    "notifications: shutdown drain timed out; dropping pending publishes"
364                                );
365                            }
366                            break;
367                        }
368                    }
369                }
370                break;
371            }
372        }
373    }
374    info!("notifications: publish drainer exiting");
375}
376
377async fn drain_one(svc: &NotificationService, job: PublishJob) {
378    let PublishJob {
379        wallet_lower: wallet,
380        notification_type: kind,
381        payload,
382    } = job;
383    if let Err(e) = svc.publish(&wallet, &kind, payload).await {
384        warn!(error = %e, wallet = wallet, kind = kind, "notifications: fire-and-forget publish failed");
385    }
386}