Skip to main content

hypercall_db_diesel/
settlement_ops.rs

1//! Settlement persistence operations.
2//!
3//! Free functions that take `&mut PgConnection` so they can be called
4//! from any context (DatabaseHandler trait impl, transaction closures, tests).
5//! Previously lived as private methods on DieselEventHandler.
6
7use anyhow::{anyhow, Context, Result};
8use diesel::prelude::*;
9use diesel::sql_types::{BigInt, Binary, Nullable, Numeric, Text};
10use hypercall_types::{MarginMode, WalletAddress};
11use rust_decimal::Decimal;
12
13use crate::models::{NewPositionExpiration, NewSettlementPayout, SettlementPayout};
14use crate::schema;
15
16/// Parse a symbol string and extract its expiry timestamp.
17pub fn extract_expiry_ts(symbol: &str) -> Result<i64> {
18    hypercall_settlement::SettlementInstrument::from_symbol(symbol)
19        .map(|instrument| instrument.expiry_ts)
20        .map_err(|e| anyhow!("Failed to parse symbol '{}': {}", symbol, e))
21}
22
23/// Validate settlement economics (entry price, cost basis, net PnL) and return as a tuple.
24pub fn validate_settlement_economics_tuple(
25    settlement_entry_price: Option<Decimal>,
26    cost_basis: Option<Decimal>,
27    net_pnl: Option<Decimal>,
28    context: &str,
29) -> Result<(Option<Decimal>, Option<Decimal>, Option<Decimal>)> {
30    hypercall_settlement::validate_settlement_economics(
31        settlement_entry_price,
32        cost_basis,
33        net_pnl,
34        context,
35    )
36    .map(|economics| {
37        economics
38            .map(|economics| {
39                (
40                    Some(economics.settlement_entry_price),
41                    Some(economics.cost_basis),
42                    Some(economics.net_pnl),
43                )
44            })
45            .unwrap_or((None, None, None))
46    })
47    .map_err(|e| anyhow!("{}", e))
48}
49
50/// Compute the ledger cash delta for a settlement based on margin mode.
51/// Standard mode uses settlement_value; portfolio mode uses net_pnl.
52pub fn settlement_ledger_delta_for_margin_mode(
53    margin_mode: MarginMode,
54    settlement_value: Decimal,
55    net_pnl: Option<Decimal>,
56    context: &str,
57) -> Result<Decimal> {
58    hypercall_settlement::settlement_cash_delta_for_margin_mode(
59        margin_mode,
60        settlement_value,
61        net_pnl,
62        context,
63    )
64    .map_err(|e| anyhow!("{}", e))
65}
66
67/// Idempotent settlement claim: inserts expiration + payout + ledger event
68/// in one transaction. Panics on invariant violations (data corruption).
69/// Returns whether this was a new claim or a duplicate replay.
70pub fn claim_settlement_in_tx(
71    conn: &mut PgConnection,
72    expiration: &NewPositionExpiration,
73    payout: &NewSettlementPayout,
74    ledger_delta: Decimal,
75    event_ts_ms: i64,
76) -> Result<hypercall_db::SettlementResult> {
77    let expiration_rows = diesel::insert_into(schema::position_expirations::table)
78        .values(expiration)
79        .on_conflict((
80            schema::position_expirations::wallet,
81            schema::position_expirations::symbol,
82            schema::position_expirations::expiry_ts,
83        ))
84        .do_nothing()
85        .execute(conn)?;
86
87    let expiration_inserted = expiration_rows > 0;
88    let existing_payout = load_settlement_payout_in_tx(conn, payout)?;
89
90    if expiration_inserted {
91        if let Some(existing) = existing_payout {
92            panic!(
93                "Partial settlement payout for {}/{} expiry {}: payout id={} existed before expiration claim",
94                payout.wallet, payout.symbol, payout.expiry_ts, existing.id
95            );
96        }
97        insert_settlement_payout_in_tx(conn, payout)?;
98        let ledger_event_inserted = insert_settlement_ledger_event_in_tx(
99            conn,
100            &expiration.wallet,
101            &expiration.symbol,
102            ledger_delta,
103            event_ts_ms,
104        )?;
105        if ledger_delta != Decimal::ZERO && !ledger_event_inserted {
106            panic!(
107                "Partial settlement ledger for {}/{}: nonzero settlement ledger event already existed before expiration claim",
108                expiration.wallet, expiration.symbol
109            );
110        }
111    } else {
112        lock_and_validate_existing_settlement_claim_in_tx(conn, expiration)?;
113        let Some(existing) = existing_payout else {
114            panic!(
115                "Partial settlement claim for {}/{} expiry {}: expiration row exists without payout",
116                payout.wallet, payout.symbol, payout.expiry_ts
117            );
118        };
119        validate_existing_settlement_payout_facts(&existing, payout);
120        if existing.ledger_applied {
121            validate_existing_settlement_ledger_in_tx(
122                conn,
123                &expiration.wallet,
124                &expiration.symbol,
125                ledger_delta,
126            )?;
127        } else {
128            tracing::warn!(
129                wallet = %expiration.wallet,
130                symbol = %expiration.symbol,
131                expiry_ts = expiration.expiry_ts,
132                payout_id = existing.id,
133                "Repairing legacy settlement payout with ledger_applied=false"
134            );
135            let ledger_event_inserted = insert_settlement_ledger_event_in_tx(
136                conn,
137                &expiration.wallet,
138                &expiration.symbol,
139                ledger_delta,
140                event_ts_ms,
141            )?;
142            if ledger_delta != Decimal::ZERO && !ledger_event_inserted {
143                panic!(
144                    "Partial legacy settlement payout for {}/{} expiry {}: existing id={} has ledger_applied=false but settlement ledger event already exists",
145                    payout.wallet, payout.symbol, payout.expiry_ts, existing.id
146                );
147            }
148            mark_settlement_payout_applied_in_tx(conn, &existing)?;
149        }
150    }
151
152    Ok(hypercall_db::SettlementResult {
153        newly_persisted: expiration_inserted,
154    })
155}
156
157fn insert_settlement_ledger_event_in_tx(
158    conn: &mut PgConnection,
159    wallet: &WalletAddress,
160    symbol: &str,
161    ledger_delta: Decimal,
162    event_ts_ms: i64,
163) -> Result<bool> {
164    if ledger_delta == Decimal::ZERO {
165        return Ok(false);
166    }
167
168    let rows_inserted = diesel::sql_query(
169        "INSERT INTO ledger_events (wallet, event_ts_ms, delta, event_type, reference_symbol)
170          VALUES ($1, $2, $3, $4, $5)
171          ON CONFLICT (wallet, reference_symbol, event_type)
172          WHERE reference_symbol IS NOT NULL AND event_type = 'settlement_realized_pnl'
173          DO NOTHING",
174    )
175    .bind::<Binary, _>(wallet)
176    .bind::<BigInt, _>(event_ts_ms)
177    .bind::<Numeric, _>(ledger_delta)
178    .bind::<Text, _>("settlement_realized_pnl")
179    .bind::<Nullable<Text>, _>(Some(symbol))
180    .execute(conn)?;
181
182    Ok(rows_inserted > 0)
183}
184
185fn lock_and_validate_existing_settlement_claim_in_tx(
186    conn: &mut PgConnection,
187    expiration: &NewPositionExpiration,
188) -> Result<()> {
189    #[derive(QueryableByName)]
190    struct LockedSettlementClaim {
191        #[diesel(sql_type = BigInt)]
192        id: i64,
193        #[diesel(sql_type = Numeric)]
194        settlement_price: Decimal,
195        #[diesel(sql_type = Numeric)]
196        settlement_value: Decimal,
197    }
198
199    let claim = diesel::sql_query(
200        "SELECT id, settlement_price, settlement_value
201         FROM position_expirations
202         WHERE wallet = $1 AND symbol = $2 AND expiry_ts = $3
203         FOR UPDATE",
204    )
205    .bind::<Binary, _>(&expiration.wallet)
206    .bind::<Text, _>(&expiration.symbol)
207    .bind::<BigInt, _>(expiration.expiry_ts)
208    .get_result::<LockedSettlementClaim>(conn)
209    .with_context(|| {
210        format!(
211            "Failed to lock existing settlement claim for {}/{} expiry {}",
212            expiration.wallet, expiration.symbol, expiration.expiry_ts
213        )
214    })?;
215
216    if claim.settlement_price != expiration.settlement_price
217        || claim.settlement_value != expiration.settlement_value
218    {
219        panic!(
220            "Settlement claim mismatch for {}/{} expiry {}: existing id={} price={} value={}, replay price={} value={}",
221            expiration.wallet, expiration.symbol, expiration.expiry_ts,
222            claim.id, claim.settlement_price, claim.settlement_value,
223            expiration.settlement_price, expiration.settlement_value
224        );
225    }
226
227    Ok(())
228}
229
230fn load_settlement_payout_in_tx(
231    conn: &mut PgConnection,
232    payout: &NewSettlementPayout,
233) -> Result<Option<SettlementPayout>> {
234    use schema::settlement_payouts::dsl as sp;
235
236    sp::settlement_payouts
237        .filter(sp::wallet.eq(&payout.wallet))
238        .filter(sp::symbol.eq(&payout.symbol))
239        .filter(sp::expiry_ts.eq(payout.expiry_ts))
240        .select(SettlementPayout::as_select())
241        .first(conn)
242        .optional()
243        .map_err(Into::into)
244}
245
246fn validate_existing_settlement_payout_facts(
247    existing: &SettlementPayout,
248    payout: &NewSettlementPayout,
249) {
250    if existing.position_size != payout.position_size
251        || existing.settlement_price != payout.settlement_price
252        || existing.payout_amount != payout.payout_amount
253        || existing.settlement_entry_price != payout.settlement_entry_price
254        || existing.cost_basis != payout.cost_basis
255        || existing.net_pnl != payout.net_pnl
256    {
257        panic!(
258            "Settlement payout mismatch for {}/{} expiry {}: existing id={} size={} price={} payout={} entry={:?} cost_basis={:?} net_pnl={:?}, replay size={} price={} payout={} entry={:?} cost_basis={:?} net_pnl={:?}",
259            payout.wallet, payout.symbol, payout.expiry_ts,
260            existing.id, existing.position_size, existing.settlement_price,
261            existing.payout_amount, existing.settlement_entry_price, existing.cost_basis, existing.net_pnl,
262            payout.position_size, payout.settlement_price, payout.payout_amount,
263            payout.settlement_entry_price, payout.cost_basis, payout.net_pnl
264        );
265    }
266}
267
268fn mark_settlement_payout_applied_in_tx(
269    conn: &mut PgConnection,
270    existing: &SettlementPayout,
271) -> Result<()> {
272    use schema::settlement_payouts::dsl as sp;
273
274    let rows = diesel::update(
275        sp::settlement_payouts
276            .filter(sp::id.eq(existing.id))
277            .filter(sp::ledger_applied.eq(false)),
278    )
279    .set(sp::ledger_applied.eq(true))
280    .execute(conn)?;
281
282    if rows != 1 {
283        panic!(
284            "Settlement payout repair race for {}/{} expiry {}: expected to mark existing id={} applied, updated {} rows",
285            existing.wallet, existing.symbol, existing.expiry_ts, existing.id, rows
286        );
287    }
288
289    Ok(())
290}
291
292fn insert_settlement_payout_in_tx(
293    conn: &mut PgConnection,
294    payout: &NewSettlementPayout,
295) -> Result<()> {
296    diesel::insert_into(schema::settlement_payouts::table)
297        .values(payout)
298        .execute(conn)?;
299    Ok(())
300}
301
302fn validate_existing_settlement_ledger_in_tx(
303    conn: &mut PgConnection,
304    wallet: &WalletAddress,
305    symbol: &str,
306    ledger_delta: Decimal,
307) -> Result<()> {
308    #[derive(QueryableByName)]
309    struct SettlementLedgerEvent {
310        #[diesel(sql_type = Numeric)]
311        delta: Decimal,
312    }
313
314    let event = diesel::sql_query(
315        "SELECT delta
316         FROM ledger_events
317         WHERE wallet = $1
318           AND reference_symbol = $2
319           AND event_type = 'settlement_realized_pnl'
320         FOR UPDATE",
321    )
322    .bind::<Binary, _>(wallet)
323    .bind::<Nullable<Text>, _>(Some(symbol))
324    .load::<SettlementLedgerEvent>(conn)?;
325    let deltas = event
326        .iter()
327        .map(|existing| existing.delta)
328        .collect::<Vec<_>>();
329
330    validate_settlement_ledger_deltas(wallet, symbol, ledger_delta, &deltas)
331}
332
333fn validate_settlement_ledger_deltas(
334    wallet: &WalletAddress,
335    symbol: &str,
336    ledger_delta: Decimal,
337    deltas: &[Decimal],
338) -> Result<()> {
339    match (ledger_delta == Decimal::ZERO, deltas) {
340        (true, []) => Ok(()),
341        (true, [existing_delta]) => {
342            panic!(
343                "Settlement ledger mismatch for {}/{}: expected no zero-delta ledger event, found delta={}",
344                wallet, symbol, existing_delta
345            );
346        }
347        (false, [existing_delta]) if *existing_delta == ledger_delta => Ok(()),
348        (false, [existing_delta]) => {
349            panic!(
350                "Settlement ledger mismatch for {}/{}: existing delta={}, replay delta={}",
351                wallet, symbol, existing_delta, ledger_delta
352            );
353        }
354        (false, []) => {
355            panic!(
356                "Partial settlement ledger for {}/{}: nonzero settlement has no ledger event",
357                wallet, symbol
358            );
359        }
360        (_, _) => {
361            panic!(
362                "Settlement ledger invariant violation for {}/{}: multiple settlement ledger events found",
363                wallet, symbol
364            );
365        }
366    }
367}
368
369fn validate_existing_settlement_claim_readonly(
370    conn: &mut PgConnection,
371    expiration: &NewPositionExpiration,
372) -> Result<()> {
373    #[derive(QueryableByName)]
374    struct ExistingSettlementClaim {
375        #[diesel(sql_type = BigInt)]
376        id: i64,
377        #[diesel(sql_type = Numeric)]
378        settlement_price: Decimal,
379        #[diesel(sql_type = Numeric)]
380        settlement_value: Decimal,
381    }
382
383    let claim = diesel::sql_query(
384        "SELECT id, settlement_price, settlement_value
385         FROM position_expirations
386         WHERE wallet = $1 AND symbol = $2 AND expiry_ts = $3",
387    )
388    .bind::<Binary, _>(&expiration.wallet)
389    .bind::<Text, _>(&expiration.symbol)
390    .bind::<BigInt, _>(expiration.expiry_ts)
391    .get_result::<ExistingSettlementClaim>(conn)
392    .with_context(|| {
393        format!(
394            "Failed to load existing settlement claim for {}/{} expiry {}",
395            expiration.wallet, expiration.symbol, expiration.expiry_ts
396        )
397    })?;
398
399    if claim.settlement_price != expiration.settlement_price
400        || claim.settlement_value != expiration.settlement_value
401    {
402        panic!(
403            "Settlement claim mismatch for {}/{} expiry {}: existing id={} price={} value={}, replay price={} value={}",
404            expiration.wallet,
405            expiration.symbol,
406            expiration.expiry_ts,
407            claim.id,
408            claim.settlement_price,
409            claim.settlement_value,
410            expiration.settlement_price,
411            expiration.settlement_value
412        );
413    }
414
415    Ok(())
416}
417
418fn validate_existing_settlement_ledger_readonly(
419    conn: &mut PgConnection,
420    wallet: &WalletAddress,
421    symbol: &str,
422    ledger_delta: Decimal,
423) -> Result<()> {
424    #[derive(QueryableByName)]
425    struct SettlementLedgerEvent {
426        #[diesel(sql_type = Numeric)]
427        delta: Decimal,
428    }
429
430    let event = diesel::sql_query(
431        "SELECT delta
432         FROM ledger_events
433         WHERE wallet = $1
434           AND reference_symbol = $2
435           AND event_type = 'settlement_realized_pnl'",
436    )
437    .bind::<Binary, _>(wallet)
438    .bind::<Nullable<Text>, _>(Some(symbol))
439    .load::<SettlementLedgerEvent>(conn)?;
440    let deltas = event
441        .iter()
442        .map(|existing| existing.delta)
443        .collect::<Vec<_>>();
444
445    validate_settlement_ledger_deltas(wallet, symbol, ledger_delta, &deltas)
446}
447
448/// Read-only validation of an already-applied settlement (standby replay path).
449/// Panics on any inconsistency between the replay and the persisted state.
450pub fn observe_applied_settlement_in_tx(
451    conn: &mut PgConnection,
452    expiration: &NewPositionExpiration,
453    payout: &NewSettlementPayout,
454    ledger_delta: Decimal,
455) -> Result<hypercall_db::SettlementResult> {
456    validate_existing_settlement_claim_readonly(conn, expiration)?;
457    let Some(existing) = load_settlement_payout_in_tx(conn, payout)? else {
458        panic!(
459            "Partial settlement claim for {}/{} expiry {}: expiration row exists without payout",
460            payout.wallet, payout.symbol, payout.expiry_ts
461        );
462    };
463    validate_existing_settlement_payout_facts(&existing, payout);
464    if !existing.ledger_applied {
465        panic!(
466            "Partial settlement payout for {}/{} expiry {}: payout id={} exists but ledger_applied=false during standby replay",
467            payout.wallet, payout.symbol, payout.expiry_ts, existing.id
468        );
469    }
470    validate_existing_settlement_ledger_readonly(
471        conn,
472        &expiration.wallet,
473        &expiration.symbol,
474        ledger_delta,
475    )?;
476    Ok(hypercall_db::SettlementResult {
477        newly_persisted: false,
478    })
479}
480
481/// Total fill count and notional volume across all fills.
482pub fn query_total_fill_volume(conn: &mut PgConnection) -> Result<(i64, Decimal)> {
483    #[derive(QueryableByName)]
484    struct Row {
485        #[diesel(sql_type = BigInt)]
486        fill_count: i64,
487        #[diesel(sql_type = Numeric)]
488        total_notional: Decimal,
489    }
490
491    let result = diesel::sql_query(
492        "SELECT COUNT(*)::bigint AS fill_count, \
493                COALESCE(SUM(ABS(size) / 1000000.0 * price), 0)::numeric AS total_notional \
494         FROM fills",
495    )
496    .get_result::<Row>(conn)?;
497    Ok((result.fill_count, result.total_notional))
498}