1use anyhow::{Context, Result};
39use chrono::Utc;
40use redis::aio::ConnectionManager;
41use redis::AsyncCommands;
42use std::sync::atomic::{AtomicU64, Ordering};
43use std::sync::Arc;
44use std::time::Duration;
45use tokio::sync::{broadcast, mpsc, Mutex};
46use tracing::{debug, info, warn};
47
48use hypercall_db::{NewNotificationInput, NotificationRecord, NotificationWriter};
49
50const HAS_UNREAD_KEY_PREFIX: &str = "notifications:has_unread:";
51const DEFAULT_LIST_LIMIT: i64 = 50;
52const MAX_LIST_LIMIT: i64 = 100;
53const REDIS_COMMAND_TIMEOUT: Duration = Duration::from_secs(2);
54const REDIS_FAILURE_BACKOFF: Duration = Duration::from_secs(30);
59const PUBLISH_QUEUE_CAPACITY: usize = 1_000;
63const SHUTDOWN_DRAIN_TIMEOUT: Duration = Duration::from_secs(5);
67
68fn now_ms() -> u64 {
69 std::time::SystemTime::now()
70 .duration_since(std::time::UNIX_EPOCH)
71 .map(|d| d.as_millis() as u64)
72 .unwrap_or(0)
73}
74
75fn has_unread_key(wallet_lower: &str) -> String {
76 format!("{HAS_UNREAD_KEY_PREFIX}{wallet_lower}")
77}
78
79struct PublishJob {
80 wallet_lower: String,
81 notification_type: String,
82 payload: Vec<u8>,
83}
84
85pub struct NotificationService {
86 db: Arc<dyn NotificationWriter>,
87 redis_client: Option<redis::Client>,
90 redis_conn: Arc<Mutex<Option<ConnectionManager>>>,
91 redis_last_failure_ms: Arc<AtomicU64>,
95 publish_tx: mpsc::Sender<PublishJob>,
96}
97
98impl NotificationService {
99 pub fn new(
100 db: Arc<dyn NotificationWriter>,
101 redis_client: Option<redis::Client>,
102 shutdown_rx: broadcast::Receiver<()>,
103 ) -> Arc<Self> {
104 let (publish_tx, publish_rx) = mpsc::channel::<PublishJob>(PUBLISH_QUEUE_CAPACITY);
105 let svc = Arc::new(Self {
106 db,
107 redis_client,
108 redis_conn: Arc::new(Mutex::new(None)),
109 redis_last_failure_ms: Arc::new(AtomicU64::new(0)),
110 publish_tx,
111 });
112 let drainer = svc.clone();
113 tokio::spawn(drain_publish_queue(drainer, publish_rx, shutdown_rx));
114 svc
115 }
116
117 fn in_redis_backoff(&self) -> bool {
119 let last = self.redis_last_failure_ms.load(Ordering::Relaxed);
120 if last == 0 {
121 return false;
122 }
123 now_ms().saturating_sub(last) < REDIS_FAILURE_BACKOFF.as_millis() as u64
124 }
125
126 fn mark_redis_failure(&self) {
127 self.redis_last_failure_ms
128 .store(now_ms(), Ordering::Relaxed);
129 }
130
131 fn clear_redis_failure(&self) {
132 self.redis_last_failure_ms.store(0, Ordering::Relaxed);
133 }
134
135 async fn conn(&self) -> Option<ConnectionManager> {
136 let client = self.redis_client.as_ref()?;
137 if self.in_redis_backoff() {
142 return None;
143 }
144 let mut guard = self.redis_conn.lock().await;
145 if let Some(existing) = guard.as_ref() {
146 return Some(existing.clone());
147 }
148 let conn = match tokio::time::timeout(
149 REDIS_COMMAND_TIMEOUT,
150 ConnectionManager::new(client.clone()),
151 )
152 .await
153 {
154 Ok(Ok(c)) => c,
155 Ok(Err(e)) => {
156 self.mark_redis_failure();
157 warn!(error = %e, "notifications: failed to open Redis ConnectionManager; bell cache disabled, backing off");
158 return None;
159 }
160 Err(_) => {
161 self.mark_redis_failure();
162 warn!("notifications: timed out opening Redis ConnectionManager; bell cache disabled, backing off");
163 return None;
164 }
165 };
166 *guard = Some(conn.clone());
167 self.clear_redis_failure();
168 Some(conn)
169 }
170
171 async fn set_has_unread(&self, wallet_lower: &str) {
172 let Some(mut conn) = self.conn().await else {
173 return;
174 };
175 let key = has_unread_key(wallet_lower);
176 let res: redis::RedisResult<()> =
177 tokio::time::timeout(REDIS_COMMAND_TIMEOUT, conn.set(&key, 1i32))
178 .await
179 .unwrap_or_else(|_| {
180 Err(redis::RedisError::from((
181 redis::ErrorKind::IoError,
182 "timeout",
183 )))
184 });
185 match res {
186 Ok(()) => debug!(wallet = wallet_lower, "notifications: has_unread bit set"),
187 Err(e) => {
188 warn!(error = %e, wallet = wallet_lower, "notifications: has_unread SET failed")
189 }
190 }
191 }
192
193 async fn clear_has_unread(&self, wallet_lower: &str) {
194 let Some(mut conn) = self.conn().await else {
195 return;
196 };
197 let key = has_unread_key(wallet_lower);
198 let res: redis::RedisResult<()> =
199 tokio::time::timeout(REDIS_COMMAND_TIMEOUT, conn.del(&key))
200 .await
201 .unwrap_or_else(|_| {
202 Err(redis::RedisError::from((
203 redis::ErrorKind::IoError,
204 "timeout",
205 )))
206 });
207 if let Err(e) = res {
208 warn!(error = %e, wallet = wallet_lower, "notifications: has_unread DEL failed");
209 }
210 }
211
212 async fn reconcile_has_unread_after_clear(&self, wallet_lower: &str) {
217 match self.count_unread(wallet_lower).await {
218 Ok(0) => {}
219 Ok(_) => {
220 self.set_has_unread(wallet_lower).await;
221 }
222 Err(e) => {
223 warn!(error = %e, wallet = wallet_lower, "notifications: post-clear reconcile count_unread failed");
224 }
225 }
226 }
227
228 pub fn enqueue_publish(
233 &self,
234 wallet: &str,
235 notification_type: &str,
236 payload_msgpack: Vec<u8>,
237 ) -> Result<(), ()> {
238 let job = PublishJob {
239 wallet_lower: wallet.to_lowercase(),
240 notification_type: notification_type.to_string(),
241 payload: payload_msgpack,
242 };
243 self.publish_tx.try_send(job).map_err(|e| {
244 warn!(error = %e, "notifications: publish queue full; dropping event");
245 })
246 }
247
248 pub async fn publish(
253 &self,
254 wallet: &str,
255 notification_type: &str,
256 payload_msgpack: Vec<u8>,
257 ) -> Result<NotificationRecord> {
258 let wallet_lower = wallet.to_lowercase();
259
260 let row = self
261 .db
262 .insert_notification(NewNotificationInput {
263 wallet_address: wallet_lower.clone(),
264 notification_type: notification_type.to_string(),
265 payload: payload_msgpack,
266 })
267 .await
268 .context("Failed to insert notification")?;
269
270 self.set_has_unread(&wallet_lower).await;
273
274 Ok(row)
275 }
276
277 pub async fn list(
278 &self,
279 wallet: &str,
280 before_id: Option<i64>,
281 limit: Option<i64>,
282 ) -> Result<Vec<NotificationRecord>> {
283 let effective_limit = limit.unwrap_or(DEFAULT_LIST_LIMIT).clamp(1, MAX_LIST_LIMIT);
284 let wallet_lower = wallet.to_lowercase();
285
286 self.db
287 .list_notifications(&wallet_lower, before_id, effective_limit)
288 .await
289 .context("Failed to list notifications")
290 }
291
292 pub async fn count_unread(&self, wallet: &str) -> Result<i64> {
293 let wallet_lower = wallet.to_lowercase();
294 self.db
295 .count_unread_notifications(&wallet_lower)
296 .await
297 .context("Failed to count unread notifications")
298 }
299
300 pub async fn mark_read(&self, wallet: &str, ids: &[i64]) -> Result<usize> {
301 if ids.is_empty() {
302 return Ok(0);
303 }
304 let wallet_lower = wallet.to_lowercase();
305
306 let updated = self
307 .db
308 .mark_notifications_read(&wallet_lower, ids, Utc::now())
309 .await
310 .context("Failed to mark notifications read")?;
311
312 if updated > 0 {
313 if let Ok(remaining) = self.count_unread(&wallet_lower).await {
314 if remaining == 0 {
315 self.clear_has_unread(&wallet_lower).await;
316 self.reconcile_has_unread_after_clear(&wallet_lower).await;
317 }
318 }
319 }
320 Ok(updated)
321 }
322
323 pub async fn mark_all_read(&self, wallet: &str) -> Result<usize> {
324 let wallet_lower = wallet.to_lowercase();
325
326 let updated = self
327 .db
328 .mark_all_notifications_read(&wallet_lower, Utc::now())
329 .await
330 .context("Failed to mark all notifications read")?;
331
332 self.clear_has_unread(&wallet_lower).await;
333 self.reconcile_has_unread_after_clear(&wallet_lower).await;
334 Ok(updated)
335 }
336}
337
338async fn drain_publish_queue(
339 svc: Arc<NotificationService>,
340 mut rx: mpsc::Receiver<PublishJob>,
341 mut shutdown_rx: broadcast::Receiver<()>,
342) {
343 loop {
344 tokio::select! {
345 maybe_job = rx.recv() => {
346 let Some(job) = maybe_job else { break };
347 drain_one(&svc, job).await;
348 }
349 _ = shutdown_rx.recv() => {
350 info!("notifications: shutdown signal received; flushing pending publishes");
351 let deadline = tokio::time::Instant::now() + SHUTDOWN_DRAIN_TIMEOUT;
354 loop {
355 match tokio::time::timeout_at(deadline, rx.recv()).await {
356 Ok(Some(job)) => drain_one(&svc, job).await,
357 Ok(None) => break,
358 Err(_) => {
359 let remaining = rx.len();
360 if remaining > 0 {
361 warn!(
362 remaining,
363 "notifications: shutdown drain timed out; dropping pending publishes"
364 );
365 }
366 break;
367 }
368 }
369 }
370 break;
371 }
372 }
373 }
374 info!("notifications: publish drainer exiting");
375}
376
377async fn drain_one(svc: &NotificationService, job: PublishJob) {
378 let PublishJob {
379 wallet_lower: wallet,
380 notification_type: kind,
381 payload,
382 } = job;
383 if let Err(e) = svc.publish(&wallet, &kind, payload).await {
384 warn!(error = %e, wallet = wallet, kind = kind, "notifications: fire-and-forget publish failed");
385 }
386}