Skip to main content

hypercall_db_diesel/
settlements.rs

1//! SettlementReader + SettlementWriter implementations for DatabaseHandler.
2//!
3//! Handles idempotent settlement persistence: expiration rows, payout rows,
4//! ledger events, and settlement payout facts in a single atomic transaction.
5//! See CALL-500 post-mortem for why this atomicity is critical.
6
7use anyhow::Result;
8use diesel::prelude::*;
9use diesel::RunQueryDsl;
10use rust_decimal::Decimal;
11
12use hypercall_types::WalletAddress;
13
14use crate::database_handler::DatabaseHandler;
15
16impl hypercall_db::SettlementReader for DatabaseHandler {
17    fn is_settlement_ledger_applied_sync(
18        &self,
19        wallet: &WalletAddress,
20        symbol: &str,
21    ) -> Result<bool> {
22        use crate::schema::settlement_payouts::dsl as sp;
23
24        let mut conn = self.pool().get()?;
25        let expiry_ts = crate::settlement_ops::extract_expiry_ts(symbol)?;
26
27        let result: Option<bool> = sp::settlement_payouts
28            .filter(sp::wallet.eq(wallet))
29            .filter(sp::symbol.eq(symbol))
30            .filter(sp::expiry_ts.eq(expiry_ts))
31            .select(sp::ledger_applied)
32            .first(&mut conn)
33            .optional()?;
34
35        Ok(result.unwrap_or(false))
36    }
37
38    fn get_applied_settlement_symbols_sync(
39        &self,
40        wallet: &WalletAddress,
41        symbols: &[String],
42    ) -> Result<std::collections::HashSet<String>> {
43        use crate::schema::settlement_payouts::dsl as sp;
44
45        if symbols.is_empty() {
46            return Ok(std::collections::HashSet::new());
47        }
48
49        let mut conn = self.pool().get()?;
50        let settled_symbols = sp::settlement_payouts
51            .filter(sp::wallet.eq(wallet))
52            .filter(sp::ledger_applied.eq(true))
53            .filter(sp::symbol.eq_any(symbols))
54            .select(sp::symbol)
55            .load::<String>(&mut conn)?;
56
57        Ok(settled_symbols.into_iter().collect())
58    }
59
60    fn get_total_fill_volume_sync(&self) -> Result<(i64, Decimal)> {
61        let mut conn = self.pool().get()?;
62        crate::settlement_ops::query_total_fill_volume(&mut conn)
63    }
64}
65
66impl hypercall_db::SettlementWriter for DatabaseHandler {
67    fn try_apply_settlement_sync(
68        &self,
69        wallet: &WalletAddress,
70        symbol: &str,
71        position_size: Decimal,
72        settlement_price: Decimal,
73        settlement_value: Decimal,
74        margin_mode: hypercall_types::MarginMode,
75        event_ts_ms: i64,
76        settlement_entry_price: Option<Decimal>,
77        cost_basis: Option<Decimal>,
78        net_pnl: Option<Decimal>,
79    ) -> Result<hypercall_db::SettlementResult> {
80        use crate::models::{NewPositionExpiration, NewSettlementPayout};
81
82        let mut conn = self.pool().get()?;
83        let expiry_ts = crate::settlement_ops::extract_expiry_ts(symbol)?;
84        let context = format!("{}/{}", wallet, symbol);
85
86        let expected_value = position_size * settlement_price;
87        if settlement_value != expected_value {
88            anyhow::bail!(
89                "Settlement value mismatch for {}: expected {} (position_size={} * settlement_price={}), got {}",
90                context, expected_value, position_size, settlement_price, settlement_value
91            );
92        }
93
94        let (settlement_entry_price, cost_basis, net_pnl) =
95            crate::settlement_ops::validate_settlement_economics_tuple(
96                settlement_entry_price,
97                cost_basis,
98                net_pnl,
99                &context,
100            )?;
101        let ledger_delta = crate::settlement_ops::settlement_ledger_delta_for_margin_mode(
102            margin_mode,
103            settlement_value,
104            net_pnl,
105            &context,
106        )?;
107
108        let new_expiration = NewPositionExpiration {
109            wallet: *wallet,
110            symbol: symbol.to_string(),
111            expiry_ts,
112            settlement_price,
113            settlement_value,
114        };
115        let payout = NewSettlementPayout {
116            wallet: *wallet,
117            symbol: symbol.to_string(),
118            expiry_ts,
119            position_size,
120            settlement_price,
121            payout_amount: settlement_value,
122            ledger_applied: true,
123            settlement_entry_price,
124            cost_basis,
125            net_pnl,
126        };
127
128        let outcome = conn.transaction::<_, anyhow::Error, _>(|tx_conn| {
129            crate::settlement_ops::claim_settlement_in_tx(
130                tx_conn,
131                &new_expiration,
132                &payout,
133                ledger_delta,
134                event_ts_ms,
135            )
136        })?;
137
138        if !outcome.newly_persisted {
139            tracing::info!(
140                "Settlement already applied for {}/{}, skipping (idempotency)",
141                wallet,
142                symbol
143            );
144        } else {
145            tracing::info!(
146                "Settlement recorded: wallet={}, symbol={}, size={}, payout={}",
147                wallet,
148                symbol,
149                position_size,
150                settlement_value
151            );
152        }
153
154        Ok(outcome)
155    }
156
157    fn observe_applied_settlement_sync(
158        &self,
159        wallet: &WalletAddress,
160        symbol: &str,
161        position_size: Decimal,
162        settlement_price: Decimal,
163        settlement_value: Decimal,
164        margin_mode: hypercall_types::MarginMode,
165        settlement_entry_price: Option<Decimal>,
166        cost_basis: Option<Decimal>,
167        net_pnl: Option<Decimal>,
168    ) -> Result<hypercall_db::SettlementResult> {
169        use crate::models::{NewPositionExpiration, NewSettlementPayout};
170
171        let mut conn = self.pool().get()?;
172        let expiry_ts = crate::settlement_ops::extract_expiry_ts(symbol)?;
173        let context = format!("{}/{}", wallet, symbol);
174
175        let expected_value = position_size * settlement_price;
176        if settlement_value != expected_value {
177            anyhow::bail!(
178                "Settlement value mismatch for {}: expected {} (position_size={} * settlement_price={}), got {}",
179                context, expected_value, position_size, settlement_price, settlement_value
180            );
181        }
182
183        let (settlement_entry_price, cost_basis, net_pnl) =
184            crate::settlement_ops::validate_settlement_economics_tuple(
185                settlement_entry_price,
186                cost_basis,
187                net_pnl,
188                &context,
189            )?;
190        let ledger_delta = crate::settlement_ops::settlement_ledger_delta_for_margin_mode(
191            margin_mode,
192            settlement_value,
193            net_pnl,
194            &context,
195        )?;
196
197        let expiration = NewPositionExpiration {
198            wallet: *wallet,
199            symbol: symbol.to_string(),
200            expiry_ts,
201            settlement_price,
202            settlement_value,
203        };
204
205        let payout = NewSettlementPayout {
206            wallet: *wallet,
207            symbol: symbol.to_string(),
208            expiry_ts,
209            position_size,
210            settlement_price,
211            payout_amount: settlement_value,
212            ledger_applied: true,
213            settlement_entry_price,
214            cost_basis,
215            net_pnl,
216        };
217
218        conn.transaction::<_, anyhow::Error, _>(|tx_conn| {
219            crate::settlement_ops::observe_applied_settlement_in_tx(
220                tx_conn,
221                &expiration,
222                &payout,
223                ledger_delta,
224            )
225        })
226    }
227}
228
229#[cfg(test)]
230mod tests {
231    use crate::test_helpers::TestDb;
232    use hypercall_db::*;
233    use hypercall_types::wallet_address::test_wallet;
234    use hypercall_types::MarginMode;
235    use rust_decimal_macros::dec;
236
237    #[tokio::test]
238    async fn settlement_not_applied_on_empty_db() {
239        let test_db = TestDb::new().await.unwrap();
240        let db = test_db.handler.as_ref();
241        let wallet = test_wallet(20);
242        let applied = db
243            .is_settlement_ledger_applied_sync(&wallet, "BTC-20260131-100000-C")
244            .unwrap();
245        assert!(!applied);
246    }
247
248    #[tokio::test]
249    async fn settlement_applied_symbols_empty() {
250        let test_db = TestDb::new().await.unwrap();
251        let db = test_db.handler.as_ref();
252        let wallet = test_wallet(21);
253        let symbols = vec!["BTC-20260131-100000-C".to_string()];
254        let applied = db
255            .get_applied_settlement_symbols_sync(&wallet, &symbols)
256            .unwrap();
257        assert!(applied.is_empty());
258    }
259
260    #[tokio::test]
261    async fn settlement_try_apply_roundtrip() {
262        let test_db = TestDb::new().await.unwrap();
263        let db = test_db.handler.as_ref();
264        let wallet = test_wallet(23);
265
266        let result = db
267            .try_apply_settlement_sync(
268                &wallet,
269                "BTC-20260131-100000-C",
270                dec!(1),
271                dec!(5000),
272                dec!(5000),
273                MarginMode::Standard,
274                1700000000000,
275                Some(dec!(95000)),
276                Some(dec!(95000)),
277                Some(dec!(5000)),
278            )
279            .unwrap();
280        assert!(result.newly_persisted);
281
282        let applied = db
283            .is_settlement_ledger_applied_sync(&wallet, "BTC-20260131-100000-C")
284            .unwrap();
285        assert!(applied);
286
287        // Idempotent replay
288        let result2 = db
289            .try_apply_settlement_sync(
290                &wallet,
291                "BTC-20260131-100000-C",
292                dec!(1),
293                dec!(5000),
294                dec!(5000),
295                MarginMode::Standard,
296                1700000000000,
297                Some(dec!(95000)),
298                Some(dec!(95000)),
299                Some(dec!(5000)),
300            )
301            .unwrap();
302        assert!(!result2.newly_persisted);
303    }
304
305    #[tokio::test]
306    async fn settlement_total_fill_volume_empty() {
307        let test_db = TestDb::new().await.unwrap();
308        let db = test_db.handler.as_ref();
309        let (count, volume) = db.get_total_fill_volume_sync().unwrap();
310        assert_eq!(count, 0);
311        assert_eq!(volume, dec!(0));
312    }
313}