Skip to main content

hypercall_db_diesel/
notifications.rs

1//! NotificationReader + NotificationWriter trait implementations for DieselDb.
2//!
3//! Persists the in-app notification feed (list, insert, mark-read).
4//! Redis caching of the unread bit stays in the service layer.
5
6use anyhow::Result;
7use async_trait::async_trait;
8use chrono::{DateTime, Utc};
9use diesel::prelude::*;
10use diesel_async::RunQueryDsl;
11
12use crate::diesel_db::DieselDb;
13use crate::models::{NewNotification, Notification};
14use crate::schema::notifications;
15use hypercall_db::{
16    NewNotificationInput, NotificationReader, NotificationRecord, NotificationWriter,
17};
18
19// -- Conversions from Diesel model to domain type --
20
21impl From<Notification> for NotificationRecord {
22    fn from(row: Notification) -> Self {
23        Self {
24            id: row.id,
25            wallet_address: row.wallet_address,
26            notification_type: row.notification_type,
27            payload: row.payload,
28            read_at: row.read_at,
29            created_at: row.created_at,
30        }
31    }
32}
33
34// =============================================================================
35// NotificationReader
36// =============================================================================
37
38#[async_trait]
39impl NotificationReader for DieselDb {
40    async fn list_notifications(
41        &self,
42        wallet: &str,
43        before_id: Option<i64>,
44        limit: i64,
45    ) -> Result<Vec<NotificationRecord>> {
46        let mut conn = self.get_conn().await?;
47        let wallet_lower = wallet.to_lowercase();
48
49        let mut query = notifications::table
50            .filter(notifications::wallet_address.eq(&wallet_lower))
51            .order(notifications::id.desc())
52            .limit(limit)
53            .into_boxed();
54
55        if let Some(before) = before_id {
56            query = query.filter(notifications::id.lt(before));
57        }
58
59        let rows = query
60            .select(Notification::as_select())
61            .load::<Notification>(&mut conn)
62            .await?;
63
64        Ok(rows.into_iter().map(Into::into).collect())
65    }
66
67    async fn count_unread_notifications(&self, wallet: &str) -> Result<i64> {
68        let mut conn = self.get_conn().await?;
69        let wallet_lower = wallet.to_lowercase();
70
71        let count = notifications::table
72            .filter(notifications::wallet_address.eq(&wallet_lower))
73            .filter(notifications::read_at.is_null())
74            .count()
75            .get_result::<i64>(&mut conn)
76            .await?;
77
78        Ok(count)
79    }
80}
81
82// =============================================================================
83// NotificationWriter
84// =============================================================================
85
86#[async_trait]
87impl NotificationWriter for DieselDb {
88    async fn insert_notification(&self, input: NewNotificationInput) -> Result<NotificationRecord> {
89        let mut conn = self.get_conn().await?;
90
91        let row = diesel::insert_into(notifications::table)
92            .values(NewNotification {
93                wallet_address: input.wallet_address,
94                notification_type: input.notification_type,
95                payload: input.payload,
96            })
97            .get_result::<Notification>(&mut conn)
98            .await?;
99
100        Ok(row.into())
101    }
102
103    async fn mark_notifications_read(
104        &self,
105        wallet: &str,
106        ids: &[i64],
107        now: DateTime<Utc>,
108    ) -> Result<usize> {
109        if ids.is_empty() {
110            return Ok(0);
111        }
112        let mut conn = self.get_conn().await?;
113        let wallet_lower = wallet.to_lowercase();
114
115        let updated = diesel::update(
116            notifications::table
117                .filter(notifications::wallet_address.eq(&wallet_lower))
118                .filter(notifications::id.eq_any(ids))
119                .filter(notifications::read_at.is_null()),
120        )
121        .set(notifications::read_at.eq(now))
122        .execute(&mut conn)
123        .await?;
124
125        Ok(updated)
126    }
127
128    async fn mark_all_notifications_read(&self, wallet: &str, now: DateTime<Utc>) -> Result<usize> {
129        let mut conn = self.get_conn().await?;
130        let wallet_lower = wallet.to_lowercase();
131
132        let updated = diesel::update(
133            notifications::table
134                .filter(notifications::wallet_address.eq(&wallet_lower))
135                .filter(notifications::read_at.is_null()),
136        )
137        .set(notifications::read_at.eq(now))
138        .execute(&mut conn)
139        .await?;
140
141        Ok(updated)
142    }
143}