hypercall_db_diesel/
notifications.rs1use anyhow::Result;
7use async_trait::async_trait;
8use chrono::{DateTime, Utc};
9use diesel::prelude::*;
10use diesel_async::RunQueryDsl;
11
12use crate::diesel_db::DieselDb;
13use crate::models::{NewNotification, Notification};
14use crate::schema::notifications;
15use hypercall_db::{
16 NewNotificationInput, NotificationReader, NotificationRecord, NotificationWriter,
17};
18
19impl From<Notification> for NotificationRecord {
22 fn from(row: Notification) -> Self {
23 Self {
24 id: row.id,
25 wallet_address: row.wallet_address,
26 notification_type: row.notification_type,
27 payload: row.payload,
28 read_at: row.read_at,
29 created_at: row.created_at,
30 }
31 }
32}
33
34#[async_trait]
39impl NotificationReader for DieselDb {
40 async fn list_notifications(
41 &self,
42 wallet: &str,
43 before_id: Option<i64>,
44 limit: i64,
45 ) -> Result<Vec<NotificationRecord>> {
46 let mut conn = self.get_conn().await?;
47 let wallet_lower = wallet.to_lowercase();
48
49 let mut query = notifications::table
50 .filter(notifications::wallet_address.eq(&wallet_lower))
51 .order(notifications::id.desc())
52 .limit(limit)
53 .into_boxed();
54
55 if let Some(before) = before_id {
56 query = query.filter(notifications::id.lt(before));
57 }
58
59 let rows = query
60 .select(Notification::as_select())
61 .load::<Notification>(&mut conn)
62 .await?;
63
64 Ok(rows.into_iter().map(Into::into).collect())
65 }
66
67 async fn count_unread_notifications(&self, wallet: &str) -> Result<i64> {
68 let mut conn = self.get_conn().await?;
69 let wallet_lower = wallet.to_lowercase();
70
71 let count = notifications::table
72 .filter(notifications::wallet_address.eq(&wallet_lower))
73 .filter(notifications::read_at.is_null())
74 .count()
75 .get_result::<i64>(&mut conn)
76 .await?;
77
78 Ok(count)
79 }
80}
81
82#[async_trait]
87impl NotificationWriter for DieselDb {
88 async fn insert_notification(&self, input: NewNotificationInput) -> Result<NotificationRecord> {
89 let mut conn = self.get_conn().await?;
90
91 let row = diesel::insert_into(notifications::table)
92 .values(NewNotification {
93 wallet_address: input.wallet_address,
94 notification_type: input.notification_type,
95 payload: input.payload,
96 })
97 .get_result::<Notification>(&mut conn)
98 .await?;
99
100 Ok(row.into())
101 }
102
103 async fn mark_notifications_read(
104 &self,
105 wallet: &str,
106 ids: &[i64],
107 now: DateTime<Utc>,
108 ) -> Result<usize> {
109 if ids.is_empty() {
110 return Ok(0);
111 }
112 let mut conn = self.get_conn().await?;
113 let wallet_lower = wallet.to_lowercase();
114
115 let updated = diesel::update(
116 notifications::table
117 .filter(notifications::wallet_address.eq(&wallet_lower))
118 .filter(notifications::id.eq_any(ids))
119 .filter(notifications::read_at.is_null()),
120 )
121 .set(notifications::read_at.eq(now))
122 .execute(&mut conn)
123 .await?;
124
125 Ok(updated)
126 }
127
128 async fn mark_all_notifications_read(&self, wallet: &str, now: DateTime<Utc>) -> Result<usize> {
129 let mut conn = self.get_conn().await?;
130 let wallet_lower = wallet.to_lowercase();
131
132 let updated = diesel::update(
133 notifications::table
134 .filter(notifications::wallet_address.eq(&wallet_lower))
135 .filter(notifications::read_at.is_null()),
136 )
137 .set(notifications::read_at.eq(now))
138 .execute(&mut conn)
139 .await?;
140
141 Ok(updated)
142 }
143}