1use diesel::prelude::*;
8use diesel_async::{AsyncPgConnection, RunQueryDsl};
9
10use crate::diesel_db::DieselDb;
11
12#[async_trait::async_trait]
17impl hypercall_db::IntegrityReader for DieselDb {
18 async fn get_integrity_query_results(&self) -> hypercall_db::IntegrityQueryResults {
19 let mut conn = match self.get_conn().await {
20 Ok(c) => c,
21 Err(e) => {
22 let m = e.to_string();
23 return hypercall_db::IntegrityQueryResults {
24 fill_volume: Err(anyhow::anyhow!(m.clone())),
25 settlement_stats: Err(anyhow::anyhow!(m.clone())),
26 open_interest_by_underlying: Err(anyhow::anyhow!(m.clone())),
27 ledger_events_settlement_total: Err(anyhow::anyhow!(m.clone())),
28 orphaned_settlement_orders: Err(anyhow::anyhow!(m)),
29 };
30 }
31 };
32 hypercall_db::IntegrityQueryResults {
33 fill_volume: integrity_query_fill_volume(&mut conn).await,
34 settlement_stats: integrity_query_settlement(&mut conn).await,
35 open_interest_by_underlying: integrity_query_open_interest(&mut conn).await,
36 ledger_events_settlement_total: integrity_query_ledger_settlement_total(&mut conn)
37 .await,
38 orphaned_settlement_orders: integrity_query_orphaned_orders(&mut conn).await,
39 }
40 }
41
42 #[cfg(feature = "test-utils")]
43 async fn get_settlement_integrity(
44 &self,
45 ) -> anyhow::Result<(i64, i64, i64, rust_decimal::Decimal)> {
46 let mut conn = self.get_conn().await?;
47 integrity_query_settlement(&mut conn).await
48 }
49
50 #[cfg(feature = "test-utils")]
51 async fn get_open_interest_by_underlying(
52 &self,
53 ) -> anyhow::Result<Vec<(String, rust_decimal::Decimal)>> {
54 let mut conn = self.get_conn().await?;
55 integrity_query_open_interest(&mut conn).await
56 }
57}
58
59async fn integrity_query_fill_volume(
60 c: &mut AsyncPgConnection,
61) -> anyhow::Result<(i64, rust_decimal::Decimal)> {
62 #[derive(QueryableByName)]
63 struct R {
64 #[diesel(sql_type = diesel::sql_types::BigInt)]
65 fill_count: i64,
66 #[diesel(sql_type = diesel::sql_types::Numeric)]
67 total_notional: rust_decimal::Decimal,
68 }
69 let r = diesel::sql_query("SELECT COUNT(*)::bigint AS fill_count, COALESCE(SUM(ABS(size)/1000000.0*price),0)::numeric AS total_notional FROM fills").get_result::<R>(c).await?;
70 Ok((r.fill_count, r.total_notional))
71}
72
73async fn integrity_query_settlement(
74 c: &mut AsyncPgConnection,
75) -> anyhow::Result<(i64, i64, i64, rust_decimal::Decimal)> {
76 #[derive(QueryableByName)]
77 struct R {
78 #[diesel(sql_type = diesel::sql_types::BigInt)]
79 total: i64,
80 #[diesel(sql_type = diesel::sql_types::BigInt)]
81 applied: i64,
82 #[diesel(sql_type = diesel::sql_types::BigInt)]
83 pending: i64,
84 #[diesel(sql_type = diesel::sql_types::Numeric)]
85 total_value: rust_decimal::Decimal,
86 }
87 let r = diesel::sql_query("SELECT COUNT(*)::bigint AS total, COUNT(*) FILTER (WHERE ledger_applied=true)::bigint AS applied, COUNT(*) FILTER (WHERE ledger_applied=false)::bigint AS pending, COALESCE(SUM(payout_amount),0)::numeric AS total_value FROM settlement_payouts").get_result::<R>(c).await?;
88 Ok((r.total, r.applied, r.pending, r.total_value))
89}
90
91async fn integrity_query_open_interest(
92 c: &mut AsyncPgConnection,
93) -> anyhow::Result<Vec<(String, rust_decimal::Decimal)>> {
94 #[derive(QueryableByName)]
95 struct R {
96 #[diesel(sql_type = diesel::sql_types::Text)]
97 underlying: String,
98 #[diesel(sql_type = diesel::sql_types::Numeric)]
99 open_interest: rust_decimal::Decimal,
100 }
101 Ok(diesel::sql_query("SELECT i.underlying, SUM(ABS(p.amount))::numeric AS open_interest FROM positions p JOIN instruments i ON p.option_id=i.id WHERE p.amount!=0 GROUP BY i.underlying").load::<R>(c).await?.into_iter().map(|r|(r.underlying, r.open_interest)).collect())
102}
103
104async fn integrity_query_ledger_settlement_total(
105 c: &mut AsyncPgConnection,
106) -> anyhow::Result<rust_decimal::Decimal> {
107 #[derive(QueryableByName)]
108 struct R {
109 #[diesel(sql_type = diesel::sql_types::Numeric)]
110 total: rust_decimal::Decimal,
111 }
112 Ok(diesel::sql_query("SELECT COALESCE(SUM(delta),0)::numeric AS total FROM ledger_events WHERE event_type='settlement_realized_pnl'").get_result::<R>(c).await?.total)
113}
114
115async fn integrity_query_orphaned_orders(c: &mut AsyncPgConnection) -> anyhow::Result<i64> {
116 use diesel::sql_types::BigInt;
117 #[derive(QueryableByName)]
118 struct CR {
119 #[diesel(sql_type = BigInt)]
120 count: i64,
121 }
122 #[derive(QueryableByName)]
123 struct WR {
124 #[diesel(sql_type = BigInt)]
125 watermark_ms: i64,
126 }
127 let wm = diesel::sql_query("SELECT COALESCE((SELECT watermark_ms FROM integrity_watermarks WHERE key='orphaned_settlement_orders'),(extract(epoch from now()-interval '40 minutes')*1000)::bigint) AS watermark_ms").get_result::<WR>(c).await.map(|r|r.watermark_ms).unwrap_or(0);
128 let ub = diesel::sql_query(
129 "SELECT (extract(epoch from now()-interval '10 minutes')*1000)::bigint AS watermark_ms",
130 )
131 .get_result::<WR>(c)
132 .await
133 .map(|r| r.watermark_ms)
134 .unwrap_or(0);
135 let count = diesel::sql_query("SELECT COUNT(*)::bigint AS count FROM order_infos oi JOIN instruments i ON oi.symbol=i.id WHERE i.status='SETTLED' AND oi.status IN ('OPEN','PARTIALLY_FILLED','ACKED') AND (EXISTS (SELECT 1 FROM market_updates mu WHERE mu.symbol=oi.symbol AND mu.status='MarketExpired' AND mu.timestamp<$1) OR (NOT EXISTS (SELECT 1 FROM market_updates mu_missing WHERE mu_missing.symbol=oi.symbol AND mu_missing.status='MarketExpired') AND (i.expiry*1000)<$1))").bind::<BigInt,_>(ub).get_result::<CR>(c).await?.count;
136 if count == 0 && ub > wm {
137 let _ = diesel::sql_query("INSERT INTO integrity_watermarks (key,watermark_ms,updated_at) VALUES ('orphaned_settlement_orders',$1,now()) ON CONFLICT (key) DO UPDATE SET watermark_ms=$1,updated_at=now()").bind::<BigInt,_>(ub).execute(c).await;
138 }
139 Ok(count)
140}
141
142#[cfg(test)]
143mod tests {
144 use crate::test_helpers::TestDb;
145 use diesel::prelude::*;
146 use diesel::sql_query;
147 use hypercall_db::*;
148 use hypercall_types::wallet_address::test_wallet;
149 use rust_decimal_macros::dec;
150
151 #[tokio::test]
152 async fn integrity_query_results_on_empty_db() {
153 let test_db = TestDb::new().await.unwrap();
154 let db = test_db.diesel_db().await;
155 let integrity: &dyn IntegrityReader = &db;
156 let results = integrity.get_integrity_query_results().await;
157 assert!(results.fill_volume.is_ok());
158 assert!(results.settlement_stats.is_ok());
159 }
160
161 #[cfg(feature = "test-utils")]
162 #[tokio::test]
163 async fn integrity_settlement_stats_empty() {
164 let test_db = TestDb::new().await.unwrap();
165 let db = test_db.diesel_db().await;
166 let integrity: &dyn IntegrityReader = &db;
167 let (total, applied, pending, value) = integrity.get_settlement_integrity().await.unwrap();
168 assert_eq!(total, 0);
169 assert_eq!(applied, 0);
170 assert_eq!(pending, 0);
171 assert_eq!(value, dec!(0));
172 }
173
174 #[tokio::test]
175 async fn integrity_fill_volume_after_inserts() {
176 let test_db = TestDb::new().await.unwrap();
177 let wallet = test_wallet(60);
178
179 {
182 let mut conn = test_db.handler.pool().get().unwrap();
183
184 sql_query(
186 "INSERT INTO trades (trade_id, symbol, price, size, maker_address, taker_address, maker_fee, taker_fee, timestamp) \
187 VALUES (1, 'BTC-25DEC26-100000-C', 50000, 1000000, $1, $1, 10, 10, 1000)",
188 )
189 .bind::<diesel::sql_types::Binary, _>(&wallet)
190 .execute(&mut conn)
191 .unwrap();
192
193 sql_query(
194 "INSERT INTO trades (trade_id, symbol, price, size, maker_address, taker_address, maker_fee, taker_fee, timestamp) \
195 VALUES (2, 'BTC-25DEC26-100000-C', 60000, 2000000, $1, $1, 20, 20, 2000)",
196 )
197 .bind::<diesel::sql_types::Binary, _>(&wallet)
198 .execute(&mut conn)
199 .unwrap();
200
201 sql_query(
202 "INSERT INTO fills (trade_id, wallet_address, symbol, price, size, fee, is_taker, timestamp) \
203 VALUES (1, $1, 'BTC-25DEC26-100000-C', 50000, 1000000, 10, true, 1000)",
204 )
205 .bind::<diesel::sql_types::Binary, _>(&wallet)
206 .execute(&mut conn)
207 .unwrap();
208
209 sql_query(
210 "INSERT INTO fills (trade_id, wallet_address, symbol, price, size, fee, is_taker, timestamp) \
211 VALUES (2, $1, 'BTC-25DEC26-100000-C', 60000, -2000000, 20, false, 2000)",
212 )
213 .bind::<diesel::sql_types::Binary, _>(&wallet)
214 .execute(&mut conn)
215 .unwrap();
216 }
217
218 let db = test_db.diesel_db().await;
219 let integrity: &dyn IntegrityReader = &db;
220 let results = integrity.get_integrity_query_results().await;
221 let (count, notional) = results.fill_volume.unwrap();
222 assert_eq!(count, 2);
223 assert_eq!(notional, dec!(170000));
225 }
226}