1use anyhow::{anyhow, Context, Result};
8use diesel::prelude::*;
9use diesel::sql_types::{BigInt, Binary, Nullable, Numeric, Text};
10use hypercall_types::{MarginMode, WalletAddress};
11use rust_decimal::Decimal;
12
13use crate::models::{NewPositionExpiration, NewSettlementPayout, SettlementPayout};
14use crate::schema;
15
16pub fn extract_expiry_ts(symbol: &str) -> Result<i64> {
18 hypercall_settlement::SettlementInstrument::from_symbol(symbol)
19 .map(|instrument| instrument.expiry_ts)
20 .map_err(|e| anyhow!("Failed to parse symbol '{}': {}", symbol, e))
21}
22
23pub fn validate_settlement_economics_tuple(
25 settlement_entry_price: Option<Decimal>,
26 cost_basis: Option<Decimal>,
27 net_pnl: Option<Decimal>,
28 context: &str,
29) -> Result<(Option<Decimal>, Option<Decimal>, Option<Decimal>)> {
30 hypercall_settlement::validate_settlement_economics(
31 settlement_entry_price,
32 cost_basis,
33 net_pnl,
34 context,
35 )
36 .map(|economics| {
37 economics
38 .map(|economics| {
39 (
40 Some(economics.settlement_entry_price),
41 Some(economics.cost_basis),
42 Some(economics.net_pnl),
43 )
44 })
45 .unwrap_or((None, None, None))
46 })
47 .map_err(|e| anyhow!("{}", e))
48}
49
50pub fn settlement_ledger_delta_for_margin_mode(
53 margin_mode: MarginMode,
54 settlement_value: Decimal,
55 net_pnl: Option<Decimal>,
56 context: &str,
57) -> Result<Decimal> {
58 hypercall_settlement::settlement_cash_delta_for_margin_mode(
59 margin_mode,
60 settlement_value,
61 net_pnl,
62 context,
63 )
64 .map_err(|e| anyhow!("{}", e))
65}
66
67pub fn claim_settlement_in_tx(
71 conn: &mut PgConnection,
72 expiration: &NewPositionExpiration,
73 payout: &NewSettlementPayout,
74 ledger_delta: Decimal,
75 event_ts_ms: i64,
76) -> Result<hypercall_db::SettlementResult> {
77 let expiration_rows = diesel::insert_into(schema::position_expirations::table)
78 .values(expiration)
79 .on_conflict((
80 schema::position_expirations::wallet,
81 schema::position_expirations::symbol,
82 schema::position_expirations::expiry_ts,
83 ))
84 .do_nothing()
85 .execute(conn)?;
86
87 let expiration_inserted = expiration_rows > 0;
88 let existing_payout = load_settlement_payout_in_tx(conn, payout)?;
89
90 if expiration_inserted {
91 if let Some(existing) = existing_payout {
92 panic!(
93 "Partial settlement payout for {}/{} expiry {}: payout id={} existed before expiration claim",
94 payout.wallet, payout.symbol, payout.expiry_ts, existing.id
95 );
96 }
97 insert_settlement_payout_in_tx(conn, payout)?;
98 let ledger_event_inserted = insert_settlement_ledger_event_in_tx(
99 conn,
100 &expiration.wallet,
101 &expiration.symbol,
102 ledger_delta,
103 event_ts_ms,
104 )?;
105 if ledger_delta != Decimal::ZERO && !ledger_event_inserted {
106 panic!(
107 "Partial settlement ledger for {}/{}: nonzero settlement ledger event already existed before expiration claim",
108 expiration.wallet, expiration.symbol
109 );
110 }
111 } else {
112 lock_and_validate_existing_settlement_claim_in_tx(conn, expiration)?;
113 let Some(existing) = existing_payout else {
114 panic!(
115 "Partial settlement claim for {}/{} expiry {}: expiration row exists without payout",
116 payout.wallet, payout.symbol, payout.expiry_ts
117 );
118 };
119 validate_existing_settlement_payout_facts(&existing, payout);
120 if existing.ledger_applied {
121 validate_existing_settlement_ledger_in_tx(
122 conn,
123 &expiration.wallet,
124 &expiration.symbol,
125 ledger_delta,
126 )?;
127 } else {
128 tracing::warn!(
129 wallet = %expiration.wallet,
130 symbol = %expiration.symbol,
131 expiry_ts = expiration.expiry_ts,
132 payout_id = existing.id,
133 "Repairing legacy settlement payout with ledger_applied=false"
134 );
135 let ledger_event_inserted = insert_settlement_ledger_event_in_tx(
136 conn,
137 &expiration.wallet,
138 &expiration.symbol,
139 ledger_delta,
140 event_ts_ms,
141 )?;
142 if ledger_delta != Decimal::ZERO && !ledger_event_inserted {
143 panic!(
144 "Partial legacy settlement payout for {}/{} expiry {}: existing id={} has ledger_applied=false but settlement ledger event already exists",
145 payout.wallet, payout.symbol, payout.expiry_ts, existing.id
146 );
147 }
148 mark_settlement_payout_applied_in_tx(conn, &existing)?;
149 }
150 }
151
152 Ok(hypercall_db::SettlementResult {
153 newly_persisted: expiration_inserted,
154 })
155}
156
157fn insert_settlement_ledger_event_in_tx(
158 conn: &mut PgConnection,
159 wallet: &WalletAddress,
160 symbol: &str,
161 ledger_delta: Decimal,
162 event_ts_ms: i64,
163) -> Result<bool> {
164 if ledger_delta == Decimal::ZERO {
165 return Ok(false);
166 }
167
168 let rows_inserted = diesel::sql_query(
169 "INSERT INTO ledger_events (wallet, event_ts_ms, delta, event_type, reference_symbol)
170 VALUES ($1, $2, $3, $4, $5)
171 ON CONFLICT (wallet, reference_symbol, event_type)
172 WHERE reference_symbol IS NOT NULL AND event_type = 'settlement_realized_pnl'
173 DO NOTHING",
174 )
175 .bind::<Binary, _>(wallet)
176 .bind::<BigInt, _>(event_ts_ms)
177 .bind::<Numeric, _>(ledger_delta)
178 .bind::<Text, _>("settlement_realized_pnl")
179 .bind::<Nullable<Text>, _>(Some(symbol))
180 .execute(conn)?;
181
182 Ok(rows_inserted > 0)
183}
184
185fn lock_and_validate_existing_settlement_claim_in_tx(
186 conn: &mut PgConnection,
187 expiration: &NewPositionExpiration,
188) -> Result<()> {
189 #[derive(QueryableByName)]
190 struct LockedSettlementClaim {
191 #[diesel(sql_type = BigInt)]
192 id: i64,
193 #[diesel(sql_type = Numeric)]
194 settlement_price: Decimal,
195 #[diesel(sql_type = Numeric)]
196 settlement_value: Decimal,
197 }
198
199 let claim = diesel::sql_query(
200 "SELECT id, settlement_price, settlement_value
201 FROM position_expirations
202 WHERE wallet = $1 AND symbol = $2 AND expiry_ts = $3
203 FOR UPDATE",
204 )
205 .bind::<Binary, _>(&expiration.wallet)
206 .bind::<Text, _>(&expiration.symbol)
207 .bind::<BigInt, _>(expiration.expiry_ts)
208 .get_result::<LockedSettlementClaim>(conn)
209 .with_context(|| {
210 format!(
211 "Failed to lock existing settlement claim for {}/{} expiry {}",
212 expiration.wallet, expiration.symbol, expiration.expiry_ts
213 )
214 })?;
215
216 if claim.settlement_price != expiration.settlement_price
217 || claim.settlement_value != expiration.settlement_value
218 {
219 panic!(
220 "Settlement claim mismatch for {}/{} expiry {}: existing id={} price={} value={}, replay price={} value={}",
221 expiration.wallet, expiration.symbol, expiration.expiry_ts,
222 claim.id, claim.settlement_price, claim.settlement_value,
223 expiration.settlement_price, expiration.settlement_value
224 );
225 }
226
227 Ok(())
228}
229
230fn load_settlement_payout_in_tx(
231 conn: &mut PgConnection,
232 payout: &NewSettlementPayout,
233) -> Result<Option<SettlementPayout>> {
234 use schema::settlement_payouts::dsl as sp;
235
236 sp::settlement_payouts
237 .filter(sp::wallet.eq(&payout.wallet))
238 .filter(sp::symbol.eq(&payout.symbol))
239 .filter(sp::expiry_ts.eq(payout.expiry_ts))
240 .select(SettlementPayout::as_select())
241 .first(conn)
242 .optional()
243 .map_err(Into::into)
244}
245
246fn validate_existing_settlement_payout_facts(
247 existing: &SettlementPayout,
248 payout: &NewSettlementPayout,
249) {
250 if existing.position_size != payout.position_size
251 || existing.settlement_price != payout.settlement_price
252 || existing.payout_amount != payout.payout_amount
253 || existing.settlement_entry_price != payout.settlement_entry_price
254 || existing.cost_basis != payout.cost_basis
255 || existing.net_pnl != payout.net_pnl
256 {
257 panic!(
258 "Settlement payout mismatch for {}/{} expiry {}: existing id={} size={} price={} payout={} entry={:?} cost_basis={:?} net_pnl={:?}, replay size={} price={} payout={} entry={:?} cost_basis={:?} net_pnl={:?}",
259 payout.wallet, payout.symbol, payout.expiry_ts,
260 existing.id, existing.position_size, existing.settlement_price,
261 existing.payout_amount, existing.settlement_entry_price, existing.cost_basis, existing.net_pnl,
262 payout.position_size, payout.settlement_price, payout.payout_amount,
263 payout.settlement_entry_price, payout.cost_basis, payout.net_pnl
264 );
265 }
266}
267
268fn mark_settlement_payout_applied_in_tx(
269 conn: &mut PgConnection,
270 existing: &SettlementPayout,
271) -> Result<()> {
272 use schema::settlement_payouts::dsl as sp;
273
274 let rows = diesel::update(
275 sp::settlement_payouts
276 .filter(sp::id.eq(existing.id))
277 .filter(sp::ledger_applied.eq(false)),
278 )
279 .set(sp::ledger_applied.eq(true))
280 .execute(conn)?;
281
282 if rows != 1 {
283 panic!(
284 "Settlement payout repair race for {}/{} expiry {}: expected to mark existing id={} applied, updated {} rows",
285 existing.wallet, existing.symbol, existing.expiry_ts, existing.id, rows
286 );
287 }
288
289 Ok(())
290}
291
292fn insert_settlement_payout_in_tx(
293 conn: &mut PgConnection,
294 payout: &NewSettlementPayout,
295) -> Result<()> {
296 diesel::insert_into(schema::settlement_payouts::table)
297 .values(payout)
298 .execute(conn)?;
299 Ok(())
300}
301
302fn validate_existing_settlement_ledger_in_tx(
303 conn: &mut PgConnection,
304 wallet: &WalletAddress,
305 symbol: &str,
306 ledger_delta: Decimal,
307) -> Result<()> {
308 #[derive(QueryableByName)]
309 struct SettlementLedgerEvent {
310 #[diesel(sql_type = Numeric)]
311 delta: Decimal,
312 }
313
314 let event = diesel::sql_query(
315 "SELECT delta
316 FROM ledger_events
317 WHERE wallet = $1
318 AND reference_symbol = $2
319 AND event_type = 'settlement_realized_pnl'
320 FOR UPDATE",
321 )
322 .bind::<Binary, _>(wallet)
323 .bind::<Nullable<Text>, _>(Some(symbol))
324 .load::<SettlementLedgerEvent>(conn)?;
325 let deltas = event
326 .iter()
327 .map(|existing| existing.delta)
328 .collect::<Vec<_>>();
329
330 validate_settlement_ledger_deltas(wallet, symbol, ledger_delta, &deltas)
331}
332
333fn validate_settlement_ledger_deltas(
334 wallet: &WalletAddress,
335 symbol: &str,
336 ledger_delta: Decimal,
337 deltas: &[Decimal],
338) -> Result<()> {
339 match (ledger_delta == Decimal::ZERO, deltas) {
340 (true, []) => Ok(()),
341 (true, [existing_delta]) => {
342 panic!(
343 "Settlement ledger mismatch for {}/{}: expected no zero-delta ledger event, found delta={}",
344 wallet, symbol, existing_delta
345 );
346 }
347 (false, [existing_delta]) if *existing_delta == ledger_delta => Ok(()),
348 (false, [existing_delta]) => {
349 panic!(
350 "Settlement ledger mismatch for {}/{}: existing delta={}, replay delta={}",
351 wallet, symbol, existing_delta, ledger_delta
352 );
353 }
354 (false, []) => {
355 panic!(
356 "Partial settlement ledger for {}/{}: nonzero settlement has no ledger event",
357 wallet, symbol
358 );
359 }
360 (_, _) => {
361 panic!(
362 "Settlement ledger invariant violation for {}/{}: multiple settlement ledger events found",
363 wallet, symbol
364 );
365 }
366 }
367}
368
369fn validate_existing_settlement_claim_readonly(
370 conn: &mut PgConnection,
371 expiration: &NewPositionExpiration,
372) -> Result<()> {
373 #[derive(QueryableByName)]
374 struct ExistingSettlementClaim {
375 #[diesel(sql_type = BigInt)]
376 id: i64,
377 #[diesel(sql_type = Numeric)]
378 settlement_price: Decimal,
379 #[diesel(sql_type = Numeric)]
380 settlement_value: Decimal,
381 }
382
383 let claim = diesel::sql_query(
384 "SELECT id, settlement_price, settlement_value
385 FROM position_expirations
386 WHERE wallet = $1 AND symbol = $2 AND expiry_ts = $3",
387 )
388 .bind::<Binary, _>(&expiration.wallet)
389 .bind::<Text, _>(&expiration.symbol)
390 .bind::<BigInt, _>(expiration.expiry_ts)
391 .get_result::<ExistingSettlementClaim>(conn)
392 .with_context(|| {
393 format!(
394 "Failed to load existing settlement claim for {}/{} expiry {}",
395 expiration.wallet, expiration.symbol, expiration.expiry_ts
396 )
397 })?;
398
399 if claim.settlement_price != expiration.settlement_price
400 || claim.settlement_value != expiration.settlement_value
401 {
402 panic!(
403 "Settlement claim mismatch for {}/{} expiry {}: existing id={} price={} value={}, replay price={} value={}",
404 expiration.wallet,
405 expiration.symbol,
406 expiration.expiry_ts,
407 claim.id,
408 claim.settlement_price,
409 claim.settlement_value,
410 expiration.settlement_price,
411 expiration.settlement_value
412 );
413 }
414
415 Ok(())
416}
417
418fn validate_existing_settlement_ledger_readonly(
419 conn: &mut PgConnection,
420 wallet: &WalletAddress,
421 symbol: &str,
422 ledger_delta: Decimal,
423) -> Result<()> {
424 #[derive(QueryableByName)]
425 struct SettlementLedgerEvent {
426 #[diesel(sql_type = Numeric)]
427 delta: Decimal,
428 }
429
430 let event = diesel::sql_query(
431 "SELECT delta
432 FROM ledger_events
433 WHERE wallet = $1
434 AND reference_symbol = $2
435 AND event_type = 'settlement_realized_pnl'",
436 )
437 .bind::<Binary, _>(wallet)
438 .bind::<Nullable<Text>, _>(Some(symbol))
439 .load::<SettlementLedgerEvent>(conn)?;
440 let deltas = event
441 .iter()
442 .map(|existing| existing.delta)
443 .collect::<Vec<_>>();
444
445 validate_settlement_ledger_deltas(wallet, symbol, ledger_delta, &deltas)
446}
447
448pub fn observe_applied_settlement_in_tx(
451 conn: &mut PgConnection,
452 expiration: &NewPositionExpiration,
453 payout: &NewSettlementPayout,
454 ledger_delta: Decimal,
455) -> Result<hypercall_db::SettlementResult> {
456 validate_existing_settlement_claim_readonly(conn, expiration)?;
457 let Some(existing) = load_settlement_payout_in_tx(conn, payout)? else {
458 panic!(
459 "Partial settlement claim for {}/{} expiry {}: expiration row exists without payout",
460 payout.wallet, payout.symbol, payout.expiry_ts
461 );
462 };
463 validate_existing_settlement_payout_facts(&existing, payout);
464 if !existing.ledger_applied {
465 panic!(
466 "Partial settlement payout for {}/{} expiry {}: payout id={} exists but ledger_applied=false during standby replay",
467 payout.wallet, payout.symbol, payout.expiry_ts, existing.id
468 );
469 }
470 validate_existing_settlement_ledger_readonly(
471 conn,
472 &expiration.wallet,
473 &expiration.symbol,
474 ledger_delta,
475 )?;
476 Ok(hypercall_db::SettlementResult {
477 newly_persisted: false,
478 })
479}
480
481pub fn query_total_fill_volume(conn: &mut PgConnection) -> Result<(i64, Decimal)> {
483 #[derive(QueryableByName)]
484 struct Row {
485 #[diesel(sql_type = BigInt)]
486 fill_count: i64,
487 #[diesel(sql_type = Numeric)]
488 total_notional: Decimal,
489 }
490
491 let result = diesel::sql_query(
492 "SELECT COUNT(*)::bigint AS fill_count, \
493 COALESCE(SUM(ABS(size) / 1000000.0 * price), 0)::numeric AS total_notional \
494 FROM fills",
495 )
496 .get_result::<Row>(conn)?;
497 Ok((result.fill_count, result.total_notional))
498}