Skip to main content

hypercall_db_diesel/
integrity.rs

1//! IntegrityReader trait implementation for DieselDb.
2//!
3//! Runs independent DB-backed integrity checks on a single connection for
4//! snapshot consistency. Current balances are engine-owned and are not queried
5//! from DB projections.
6
7use diesel::prelude::*;
8use diesel_async::{AsyncPgConnection, RunQueryDsl};
9
10use crate::diesel_db::DieselDb;
11
12// =============================================================================
13// IntegrityReader (async, diesel-async)
14// =============================================================================
15
16#[async_trait::async_trait]
17impl hypercall_db::IntegrityReader for DieselDb {
18    async fn get_integrity_query_results(&self) -> hypercall_db::IntegrityQueryResults {
19        let mut conn = match self.get_conn().await {
20            Ok(c) => c,
21            Err(e) => {
22                let m = e.to_string();
23                return hypercall_db::IntegrityQueryResults {
24                    fill_volume: Err(anyhow::anyhow!(m.clone())),
25                    settlement_stats: Err(anyhow::anyhow!(m.clone())),
26                    open_interest_by_underlying: Err(anyhow::anyhow!(m.clone())),
27                    ledger_events_settlement_total: Err(anyhow::anyhow!(m.clone())),
28                    orphaned_settlement_orders: Err(anyhow::anyhow!(m)),
29                };
30            }
31        };
32        hypercall_db::IntegrityQueryResults {
33            fill_volume: integrity_query_fill_volume(&mut conn).await,
34            settlement_stats: integrity_query_settlement(&mut conn).await,
35            open_interest_by_underlying: integrity_query_open_interest(&mut conn).await,
36            ledger_events_settlement_total: integrity_query_ledger_settlement_total(&mut conn)
37                .await,
38            orphaned_settlement_orders: integrity_query_orphaned_orders(&mut conn).await,
39        }
40    }
41
42    #[cfg(feature = "test-utils")]
43    async fn get_settlement_integrity(
44        &self,
45    ) -> anyhow::Result<(i64, i64, i64, rust_decimal::Decimal)> {
46        let mut conn = self.get_conn().await?;
47        integrity_query_settlement(&mut conn).await
48    }
49
50    #[cfg(feature = "test-utils")]
51    async fn get_open_interest_by_underlying(
52        &self,
53    ) -> anyhow::Result<Vec<(String, rust_decimal::Decimal)>> {
54        let mut conn = self.get_conn().await?;
55        integrity_query_open_interest(&mut conn).await
56    }
57}
58
59async fn integrity_query_fill_volume(
60    c: &mut AsyncPgConnection,
61) -> anyhow::Result<(i64, rust_decimal::Decimal)> {
62    #[derive(QueryableByName)]
63    struct R {
64        #[diesel(sql_type = diesel::sql_types::BigInt)]
65        fill_count: i64,
66        #[diesel(sql_type = diesel::sql_types::Numeric)]
67        total_notional: rust_decimal::Decimal,
68    }
69    let r = diesel::sql_query("SELECT COUNT(*)::bigint AS fill_count, COALESCE(SUM(ABS(size)/1000000.0*price),0)::numeric AS total_notional FROM fills").get_result::<R>(c).await?;
70    Ok((r.fill_count, r.total_notional))
71}
72
73async fn integrity_query_settlement(
74    c: &mut AsyncPgConnection,
75) -> anyhow::Result<(i64, i64, i64, rust_decimal::Decimal)> {
76    #[derive(QueryableByName)]
77    struct R {
78        #[diesel(sql_type = diesel::sql_types::BigInt)]
79        total: i64,
80        #[diesel(sql_type = diesel::sql_types::BigInt)]
81        applied: i64,
82        #[diesel(sql_type = diesel::sql_types::BigInt)]
83        pending: i64,
84        #[diesel(sql_type = diesel::sql_types::Numeric)]
85        total_value: rust_decimal::Decimal,
86    }
87    let r = diesel::sql_query("SELECT COUNT(*)::bigint AS total, COUNT(*) FILTER (WHERE ledger_applied=true)::bigint AS applied, COUNT(*) FILTER (WHERE ledger_applied=false)::bigint AS pending, COALESCE(SUM(payout_amount),0)::numeric AS total_value FROM settlement_payouts").get_result::<R>(c).await?;
88    Ok((r.total, r.applied, r.pending, r.total_value))
89}
90
91async fn integrity_query_open_interest(
92    c: &mut AsyncPgConnection,
93) -> anyhow::Result<Vec<(String, rust_decimal::Decimal)>> {
94    #[derive(QueryableByName)]
95    struct R {
96        #[diesel(sql_type = diesel::sql_types::Text)]
97        underlying: String,
98        #[diesel(sql_type = diesel::sql_types::Numeric)]
99        open_interest: rust_decimal::Decimal,
100    }
101    Ok(diesel::sql_query("SELECT i.underlying, SUM(ABS(p.amount))::numeric AS open_interest FROM positions p JOIN instruments i ON p.option_id=i.id WHERE p.amount!=0 GROUP BY i.underlying").load::<R>(c).await?.into_iter().map(|r|(r.underlying, r.open_interest)).collect())
102}
103
104async fn integrity_query_ledger_settlement_total(
105    c: &mut AsyncPgConnection,
106) -> anyhow::Result<rust_decimal::Decimal> {
107    #[derive(QueryableByName)]
108    struct R {
109        #[diesel(sql_type = diesel::sql_types::Numeric)]
110        total: rust_decimal::Decimal,
111    }
112    Ok(diesel::sql_query("SELECT COALESCE(SUM(delta),0)::numeric AS total FROM ledger_events WHERE event_type='settlement_realized_pnl'").get_result::<R>(c).await?.total)
113}
114
115async fn integrity_query_orphaned_orders(c: &mut AsyncPgConnection) -> anyhow::Result<i64> {
116    use diesel::sql_types::BigInt;
117    #[derive(QueryableByName)]
118    struct CR {
119        #[diesel(sql_type = BigInt)]
120        count: i64,
121    }
122    #[derive(QueryableByName)]
123    struct WR {
124        #[diesel(sql_type = BigInt)]
125        watermark_ms: i64,
126    }
127    let wm = diesel::sql_query("SELECT COALESCE((SELECT watermark_ms FROM integrity_watermarks WHERE key='orphaned_settlement_orders'),(extract(epoch from now()-interval '40 minutes')*1000)::bigint) AS watermark_ms").get_result::<WR>(c).await.map(|r|r.watermark_ms).unwrap_or(0);
128    let ub = diesel::sql_query(
129        "SELECT (extract(epoch from now()-interval '10 minutes')*1000)::bigint AS watermark_ms",
130    )
131    .get_result::<WR>(c)
132    .await
133    .map(|r| r.watermark_ms)
134    .unwrap_or(0);
135    let count = diesel::sql_query("SELECT COUNT(*)::bigint AS count FROM order_infos oi JOIN instruments i ON oi.symbol=i.id WHERE i.status='SETTLED' AND oi.status IN ('OPEN','PARTIALLY_FILLED','ACKED') AND (EXISTS (SELECT 1 FROM market_updates mu WHERE mu.symbol=oi.symbol AND mu.status='MarketExpired' AND mu.timestamp<$1) OR (NOT EXISTS (SELECT 1 FROM market_updates mu_missing WHERE mu_missing.symbol=oi.symbol AND mu_missing.status='MarketExpired') AND (i.expiry*1000)<$1))").bind::<BigInt,_>(ub).get_result::<CR>(c).await?.count;
136    if count == 0 && ub > wm {
137        let _ = diesel::sql_query("INSERT INTO integrity_watermarks (key,watermark_ms,updated_at) VALUES ('orphaned_settlement_orders',$1,now()) ON CONFLICT (key) DO UPDATE SET watermark_ms=$1,updated_at=now()").bind::<BigInt,_>(ub).execute(c).await;
138    }
139    Ok(count)
140}
141
142#[cfg(test)]
143mod tests {
144    use crate::test_helpers::TestDb;
145    use diesel::prelude::*;
146    use diesel::sql_query;
147    use hypercall_db::*;
148    use hypercall_types::wallet_address::test_wallet;
149    use rust_decimal_macros::dec;
150
151    #[tokio::test]
152    async fn integrity_query_results_on_empty_db() {
153        let test_db = TestDb::new().await.unwrap();
154        let db = test_db.diesel_db().await;
155        let integrity: &dyn IntegrityReader = &db;
156        let results = integrity.get_integrity_query_results().await;
157        assert!(results.fill_volume.is_ok());
158        assert!(results.settlement_stats.is_ok());
159    }
160
161    #[cfg(feature = "test-utils")]
162    #[tokio::test]
163    async fn integrity_settlement_stats_empty() {
164        let test_db = TestDb::new().await.unwrap();
165        let db = test_db.diesel_db().await;
166        let integrity: &dyn IntegrityReader = &db;
167        let (total, applied, pending, value) = integrity.get_settlement_integrity().await.unwrap();
168        assert_eq!(total, 0);
169        assert_eq!(applied, 0);
170        assert_eq!(pending, 0);
171        assert_eq!(value, dec!(0));
172    }
173
174    #[tokio::test]
175    async fn integrity_fill_volume_after_inserts() {
176        let test_db = TestDb::new().await.unwrap();
177        let wallet = test_wallet(60);
178
179        // Insert fills: size=1000000 (= 1 contract), price=50000
180        // notional = ABS(size)/1000000 * price = 1 * 50000 = 50000
181        {
182            let mut conn = test_db.handler.pool().get().unwrap();
183
184            // Insert parent trade rows first (fills FK references trades)
185            sql_query(
186                "INSERT INTO trades (trade_id, symbol, price, size, maker_address, taker_address, maker_fee, taker_fee, timestamp) \
187                 VALUES (1, 'BTC-25DEC26-100000-C', 50000, 1000000, $1, $1, 10, 10, 1000)",
188            )
189            .bind::<diesel::sql_types::Binary, _>(&wallet)
190            .execute(&mut conn)
191            .unwrap();
192
193            sql_query(
194                "INSERT INTO trades (trade_id, symbol, price, size, maker_address, taker_address, maker_fee, taker_fee, timestamp) \
195                 VALUES (2, 'BTC-25DEC26-100000-C', 60000, 2000000, $1, $1, 20, 20, 2000)",
196            )
197            .bind::<diesel::sql_types::Binary, _>(&wallet)
198            .execute(&mut conn)
199            .unwrap();
200
201            sql_query(
202                "INSERT INTO fills (trade_id, wallet_address, symbol, price, size, fee, is_taker, timestamp) \
203                 VALUES (1, $1, 'BTC-25DEC26-100000-C', 50000, 1000000, 10, true, 1000)",
204            )
205            .bind::<diesel::sql_types::Binary, _>(&wallet)
206            .execute(&mut conn)
207            .unwrap();
208
209            sql_query(
210                "INSERT INTO fills (trade_id, wallet_address, symbol, price, size, fee, is_taker, timestamp) \
211                 VALUES (2, $1, 'BTC-25DEC26-100000-C', 60000, -2000000, 20, false, 2000)",
212            )
213            .bind::<diesel::sql_types::Binary, _>(&wallet)
214            .execute(&mut conn)
215            .unwrap();
216        }
217
218        let db = test_db.diesel_db().await;
219        let integrity: &dyn IntegrityReader = &db;
220        let results = integrity.get_integrity_query_results().await;
221        let (count, notional) = results.fill_volume.unwrap();
222        assert_eq!(count, 2);
223        // 1*50000 + 2*60000 = 50000 + 120000 = 170000
224        assert_eq!(notional, dec!(170000));
225    }
226}