Skip to main content

hypercall_db_diesel/
liquidation.rs

1//! LiquidationReader + LiquidationWriter trait implementations for DieselDb.
2//!
3//! Covers liquidation state reads/writes, history audit trail, auction
4//! lifecycle, and idempotent liquidation bonus audit persistence.
5
6use diesel::prelude::*;
7use diesel_async::{AsyncConnection, RunQueryDsl};
8use rust_decimal::Decimal;
9
10use crate::diesel_db::DieselDb;
11use crate::models::{
12    LiquidationAuctionRecord as DieselLiquidationAuctionRecord,
13    LiquidationHistoryRecord as DieselLiquidationHistoryRecord,
14    LiquidationStateRecord as DieselLiquidationStateRecord, NewLiquidationAuction,
15    NewLiquidationHistory, NewLiquidationState, UpdateLiquidationAuction,
16};
17use crate::schema;
18use hypercall_types::WalletAddress;
19
20// =============================================================================
21// LiquidationReader (async, diesel-async)
22// =============================================================================
23
24#[async_trait::async_trait]
25impl hypercall_db::LiquidationReader for DieselDb {
26    async fn get_liquidation_state(
27        &self,
28        wallet: &WalletAddress,
29    ) -> anyhow::Result<Option<hypercall_db::LiquidationStateRecord>> {
30        use schema::liquidation_states;
31        let mut conn = self.get_conn().await?;
32        let result = liquidation_states::table
33            .filter(liquidation_states::wallet_address.eq(wallet))
34            .first::<DieselLiquidationStateRecord>(&mut conn)
35            .await
36            .optional()?;
37        Ok(result.map(Into::into))
38    }
39
40    async fn get_all_liquidation_states(
41        &self,
42    ) -> anyhow::Result<Vec<hypercall_db::LiquidationStateRecord>> {
43        use schema::liquidation_states;
44        let mut conn = self.get_conn().await?;
45        let results = liquidation_states::table
46            .load::<DieselLiquidationStateRecord>(&mut conn)
47            .await?;
48        Ok(results.into_iter().map(Into::into).collect())
49    }
50
51    async fn get_liquidation_history(
52        &self,
53        wallet: &WalletAddress,
54        limit: i64,
55        offset: i64,
56    ) -> anyhow::Result<(Vec<hypercall_db::LiquidationHistoryRecord>, i64)> {
57        use schema::liquidation_history;
58        let mut conn = self.get_conn().await?;
59
60        let count: i64 = liquidation_history::table
61            .filter(liquidation_history::wallet_address.eq(wallet))
62            .count()
63            .get_result(&mut conn)
64            .await?;
65
66        let results = liquidation_history::table
67            .filter(liquidation_history::wallet_address.eq(wallet))
68            .order((
69                liquidation_history::timestamp.desc(),
70                liquidation_history::id.desc(),
71            ))
72            .limit(limit)
73            .offset(offset)
74            .load::<DieselLiquidationHistoryRecord>(&mut conn)
75            .await?;
76
77        Ok((results.into_iter().map(Into::into).collect(), count))
78    }
79
80    async fn get_recent_liquidation_history(
81        &self,
82        limit: i64,
83    ) -> anyhow::Result<Vec<hypercall_db::LiquidationHistoryRecord>> {
84        use schema::liquidation_history;
85        let mut conn = self.get_conn().await?;
86        let results = liquidation_history::table
87            .order((
88                liquidation_history::timestamp.desc(),
89                liquidation_history::id.desc(),
90            ))
91            .limit(limit)
92            .load::<DieselLiquidationHistoryRecord>(&mut conn)
93            .await?;
94        Ok(results.into_iter().map(Into::into).collect())
95    }
96
97    async fn get_liquidation_auction(
98        &self,
99        auction_id: &str,
100    ) -> anyhow::Result<Option<hypercall_db::LiquidationAuctionRecord>> {
101        use schema::liquidation_auctions;
102        let mut conn = self.get_conn().await?;
103        let result = liquidation_auctions::table
104            .filter(liquidation_auctions::auction_id.eq(auction_id))
105            .first::<DieselLiquidationAuctionRecord>(&mut conn)
106            .await
107            .optional()?;
108        Ok(result.map(Into::into))
109    }
110
111    async fn get_max_liquidation_observed_block(&self) -> anyhow::Result<Option<u64>> {
112        use schema::liquidation_auctions;
113        let mut conn = self.get_conn().await?;
114        let result = liquidation_auctions::table
115            .select(diesel::dsl::max(liquidation_auctions::last_observed_block))
116            .first::<Option<i64>>(&mut conn)
117            .await?;
118        result
119            .map(|v| {
120                u64::try_from(v)
121                    .map_err(|_| anyhow::anyhow!("negative last_observed_block {} in database", v))
122            })
123            .transpose()
124    }
125
126    #[cfg(feature = "test-utils")]
127    async fn get_active_auctions_for_wallet(
128        &self,
129        wallet: &WalletAddress,
130    ) -> anyhow::Result<Vec<hypercall_db::LiquidationAuctionRecord>> {
131        use schema::liquidation_auctions;
132        let mut conn = self.get_conn().await?;
133        let results = liquidation_auctions::table
134            .filter(liquidation_auctions::wallet_address.eq(wallet))
135            .filter(liquidation_auctions::status.eq_any(vec!["pending", "active"]))
136            .order(liquidation_auctions::started_at.desc())
137            .load::<DieselLiquidationAuctionRecord>(&mut conn)
138            .await?;
139        Ok(results.into_iter().map(Into::into).collect())
140    }
141}
142
143// =============================================================================
144// LiquidationWriter (async, diesel-async)
145// =============================================================================
146
147#[async_trait::async_trait]
148impl hypercall_db::LiquidationWriter for DieselDb {
149    async fn save_liquidation_state(
150        &self,
151        state: &hypercall_db::LiquidationStateRecord,
152    ) -> anyhow::Result<()> {
153        use schema::liquidation_states;
154        let diesel_state = NewLiquidationState {
155            wallet_address: state.wallet_address,
156            state: state.state.clone(),
157            margin_mode: state.margin_mode.clone(),
158            equity: state.equity,
159            mm_required: state.mm_required,
160            maintenance_margin: state.maintenance_margin,
161            liquidation_mode: state.liquidation_mode.clone(),
162            target_equity: state.target_equity,
163            entered_pre_liq_at: state.entered_pre_liq_at,
164            mm_shortfall: state.mm_shortfall,
165            escalation_deadline: state.escalation_deadline,
166            last_reprice_at: state.last_reprice_at,
167            partial_order_request_ids: state.partial_order_request_ids.clone(),
168            partial_order_client_ids: state.partial_order_client_ids.clone(),
169            partial_bonus_bps: state.partial_bonus_bps,
170            auction_id: state.auction_id.clone(),
171            request_id: state.request_id.clone(),
172            tx_hash: state.tx_hash.clone(),
173            auction_started_at: state.auction_started_at,
174            chain_start_time: state.chain_start_time,
175            margin_needed: state.margin_needed,
176            stop_request_id: state.stop_request_id.clone(),
177            stop_tx_hash: state.stop_tx_hash.clone(),
178            liquidated_at: state.liquidated_at,
179            resolved_winner: state.resolved_winner,
180            resolved_bonus: state.resolved_bonus,
181            resolution_tx_hash: state.resolution_tx_hash.clone(),
182            last_observed_block: state.last_observed_block,
183            updated_at_ms: state.updated_at_ms,
184        };
185        let mut conn = self.get_conn().await?;
186        diesel::insert_into(liquidation_states::table)
187            .values(&diesel_state)
188            .on_conflict(liquidation_states::wallet_address)
189            .do_update()
190            .set(&diesel_state)
191            .execute(&mut conn)
192            .await?;
193        Ok(())
194    }
195
196    #[cfg(feature = "test-utils")]
197    async fn delete_liquidation_state(&self, wallet: &WalletAddress) -> anyhow::Result<()> {
198        use schema::liquidation_states;
199        let mut conn = self.get_conn().await?;
200        diesel::delete(
201            liquidation_states::table.filter(liquidation_states::wallet_address.eq(wallet)),
202        )
203        .execute(&mut conn)
204        .await?;
205        Ok(())
206    }
207
208    async fn insert_liquidation_history(
209        &self,
210        entry: &hypercall_db::LiquidationHistoryRecord,
211    ) -> anyhow::Result<i64> {
212        use schema::liquidation_history::dsl as lh;
213        let diesel_entry = NewLiquidationHistory {
214            wallet_address: entry.wallet_address,
215            previous_state: entry.previous_state.clone(),
216            new_state: entry.new_state.clone(),
217            liquidation_mode: entry.liquidation_mode.clone(),
218            equity: entry.equity,
219            mm_required: entry.mm_required,
220            maintenance_margin: entry.maintenance_margin,
221            shortfall: entry.shortfall,
222            auction_id: entry.auction_id.clone(),
223            request_id: entry.request_id.clone(),
224            tx_hash: entry.tx_hash.clone(),
225            margin_needed: entry.margin_needed,
226            winner_address: entry.winner_address,
227            bonus: entry.bonus,
228            details: entry.details.clone(),
229            timestamp: entry.timestamp,
230        };
231        let mut conn = self.get_conn().await?;
232        let inserted_id = diesel::insert_into(schema::liquidation_history::table)
233            .values(&diesel_entry)
234            .on_conflict_do_nothing()
235            .returning(lh::id)
236            .get_result::<i64>(&mut conn)
237            .await
238            .optional()?
239            .unwrap_or(0);
240        Ok(inserted_id)
241    }
242
243    async fn create_liquidation_auction(
244        &self,
245        auction: &hypercall_db::LiquidationAuctionRecord,
246    ) -> anyhow::Result<()> {
247        use schema::liquidation_auctions;
248        let diesel_auction = NewLiquidationAuction {
249            auction_id: auction.auction_id.clone(),
250            wallet_address: auction.wallet_address,
251            status: auction.status.clone(),
252            positions: auction.positions.clone(),
253            equity_at_start: auction.equity_at_start,
254            mm_shortfall_at_start: auction.mm_shortfall_at_start,
255            target_equity: auction.target_equity,
256            request_id: auction.request_id.clone(),
257            tx_hash: auction.tx_hash.clone(),
258            started_at: auction.started_at,
259            chain_start_time: auction.chain_start_time,
260            margin_needed: auction.margin_needed,
261            stop_request_id: auction.stop_request_id.clone(),
262            stop_tx_hash: auction.stop_tx_hash.clone(),
263            completed_at: auction.completed_at,
264            liquidator_address: auction.liquidator_address,
265            bonus: auction.bonus,
266            settlement_value: auction.settlement_value,
267            last_observed_block: auction.last_observed_block,
268        };
269        let mut conn = self.get_conn().await?;
270        diesel::insert_into(liquidation_auctions::table)
271            .values(&diesel_auction)
272            .execute(&mut conn)
273            .await?;
274        Ok(())
275    }
276
277    async fn update_liquidation_auction(
278        &self,
279        auction_id: &str,
280        updates: &hypercall_db::LiquidationAuctionUpdate,
281    ) -> anyhow::Result<()> {
282        use schema::liquidation_auctions;
283        let diesel_update = UpdateLiquidationAuction {
284            status: updates.status.clone(),
285            request_id: updates.request_id.clone(),
286            tx_hash: updates.tx_hash.clone(),
287            chain_start_time: updates.chain_start_time,
288            margin_needed: updates.margin_needed,
289            stop_request_id: updates.stop_request_id.clone(),
290            stop_tx_hash: updates.stop_tx_hash.clone(),
291            completed_at: updates.completed_at,
292            liquidator_address: updates.liquidator_address,
293            bonus: updates.bonus,
294            settlement_value: updates.settlement_value,
295            last_observed_block: updates.last_observed_block,
296        };
297        let mut conn = self.get_conn().await?;
298        diesel::update(
299            liquidation_auctions::table.filter(liquidation_auctions::auction_id.eq(auction_id)),
300        )
301        .set(&diesel_update)
302        .execute(&mut conn)
303        .await?;
304        Ok(())
305    }
306
307    async fn claim_and_apply_liquidation_bonus(
308        &self,
309        winner: &WalletAddress,
310        liquidated_wallet: &WalletAddress,
311        auction_id: &str,
312        resolution_tx_hash: &str,
313        bonus: Decimal,
314        event_ts_ms: i64,
315    ) -> anyhow::Result<hypercall_db::LiquidationBonusApplyResult> {
316        use diesel::sql_types::{BigInt, Binary, Numeric, Text};
317
318        #[derive(QueryableByName)]
319        struct ResolutionClaimRow {
320            #[diesel(sql_type = diesel::sql_types::Nullable<Text>)]
321            resolution_tx_hash: Option<String>,
322        }
323        #[derive(QueryableByName)]
324        struct LedgerEventIdRow {
325            #[diesel(sql_type = BigInt)]
326            id: i64,
327        }
328        #[derive(QueryableByName)]
329        struct LiquidationBonusLedgerEventRow {
330            #[diesel(sql_type = BigInt)]
331            id: i64,
332            #[diesel(sql_type = Binary)]
333            wallet: Vec<u8>,
334            #[diesel(sql_type = Numeric)]
335            delta: Decimal,
336        }
337
338        let mut conn = self.get_conn().await?;
339        conn.transaction(async |conn| {
340                let claim = diesel::sql_query(
341                    "SELECT resolution_tx_hash FROM liquidation_states WHERE wallet_address = $1 FOR UPDATE",
342                )
343                .bind::<diesel::sql_types::Bytea, _>(liquidated_wallet)
344                .get_result::<ResolutionClaimRow>(&mut *conn)
345                .await
346                .optional()?
347                .ok_or_else(|| anyhow::anyhow!("missing liquidation state for {} auction {}", liquidated_wallet, auction_id))?;
348
349                if let Some(existing_tx_hash) = claim.resolution_tx_hash.as_deref() {
350                    if existing_tx_hash == resolution_tx_hash {
351                        let ledger_event = diesel::sql_query(
352                            "SELECT id, wallet, delta FROM ledger_events WHERE event_type = 'liquidation_bonus' AND reference_symbol = $1 ORDER BY id DESC LIMIT 1",
353                        )
354                        .bind::<Text, _>(auction_id)
355                        .get_result::<LiquidationBonusLedgerEventRow>(&mut *conn)
356                        .await
357                        .optional()?;
358                        let ledger_event_id = if let Some(event) = ledger_event {
359                            if event.wallet != winner.as_bytes() || event.delta != bonus {
360                                panic!("STATE_CORRUPTION: liquidation bonus replay mismatch for {} auction {}", liquidated_wallet, auction_id);
361                            }
362                            Some(u64::try_from(event.id)?)
363                        } else {
364                            None
365                        };
366                        return Ok(hypercall_db::LiquidationBonusApplyResult {
367                            ledger_event_id,
368                            newly_applied: false,
369                        });
370                    }
371                    panic!("STATE_CORRUPTION: liquidation bonus claim mismatch for {} auction {} existing={} new={}", liquidated_wallet, auction_id, existing_tx_hash, resolution_tx_hash);
372                }
373
374                diesel::sql_query("UPDATE liquidation_states SET resolution_tx_hash = $1 WHERE wallet_address = $2")
375                    .bind::<Text, _>(resolution_tx_hash)
376                    .bind::<diesel::sql_types::Bytea, _>(liquidated_wallet)
377                    .execute(&mut *conn)
378                    .await?;
379
380                let event = diesel::sql_query("INSERT INTO ledger_events (wallet, event_ts_ms, delta, event_type, reference_symbol) VALUES ($1, $2, $3, 'liquidation_bonus', $4) RETURNING id")
381                    .bind::<Binary, _>(winner)
382                    .bind::<BigInt, _>(event_ts_ms)
383                    .bind::<Numeric, _>(bonus)
384                    .bind::<Text, _>(auction_id)
385                    .get_result::<LedgerEventIdRow>(&mut *conn)
386                    .await?;
387
388                Ok(hypercall_db::LiquidationBonusApplyResult {
389                    ledger_event_id: Some(u64::try_from(event.id)?),
390                    newly_applied: true,
391                })
392        })
393        .await
394    }
395}
396
397#[cfg(test)]
398mod tests {
399    use crate::test_helpers::TestDb;
400    use hypercall_db::*;
401    use hypercall_types::wallet_address::test_wallet;
402    use rust_decimal_macros::dec;
403
404    #[tokio::test]
405    async fn liquidation_state_empty_db_returns_none() {
406        let test_db = TestDb::new().await.unwrap();
407        let db = test_db.diesel_db().await;
408        let result = LiquidationReader::get_liquidation_state(&db, &test_wallet(40))
409            .await
410            .unwrap();
411        assert!(result.is_none());
412    }
413
414    #[tokio::test]
415    async fn liquidation_all_states_empty_db() {
416        let test_db = TestDb::new().await.unwrap();
417        let db = test_db.diesel_db().await;
418        let liq: &dyn LiquidationReader = &db;
419        let all = liq.get_all_liquidation_states().await.unwrap();
420        assert!(all.is_empty());
421    }
422
423    #[tokio::test]
424    async fn liquidation_max_observed_block_empty_db() {
425        let test_db = TestDb::new().await.unwrap();
426        let db = test_db.diesel_db().await;
427        let liq: &dyn LiquidationReader = &db;
428        let max_block = liq.get_max_liquidation_observed_block().await.unwrap();
429        assert!(max_block.is_none());
430    }
431
432    #[tokio::test]
433    async fn liquidation_save_and_read_roundtrip() {
434        let test_db = TestDb::new().await.unwrap();
435        let db = test_db.diesel_db().await;
436        let wallet = test_wallet(41);
437
438        let record = LiquidationStateRecord {
439            wallet_address: wallet,
440            state: "healthy".to_string(),
441            margin_mode: "standard".to_string(),
442            equity: dec!(10000),
443            mm_required: dec!(5000),
444            maintenance_margin: dec!(5000),
445            liquidation_mode: None,
446            target_equity: None,
447            entered_pre_liq_at: None,
448            mm_shortfall: None,
449            escalation_deadline: None,
450            last_reprice_at: None,
451            partial_order_request_ids: None,
452            partial_order_client_ids: None,
453            partial_bonus_bps: None,
454            auction_id: None,
455            request_id: None,
456            tx_hash: None,
457            auction_started_at: None,
458            chain_start_time: None,
459            margin_needed: None,
460            stop_request_id: None,
461            stop_tx_hash: None,
462            liquidated_at: None,
463            resolved_winner: None,
464            resolved_bonus: None,
465            resolution_tx_hash: None,
466            last_observed_block: None,
467            updated_at_ms: Some(1234567890),
468            created_at: None,
469            updated_at: None,
470        };
471
472        let liq_writer: &dyn LiquidationWriter = &db;
473        liq_writer.save_liquidation_state(&record).await.unwrap();
474
475        let liq_reader: &dyn LiquidationReader = &db;
476        let loaded = liq_reader
477            .get_liquidation_state(&wallet)
478            .await
479            .unwrap()
480            .expect("state should exist after save");
481        assert_eq!(loaded.wallet_address, wallet);
482        assert_eq!(loaded.state, "healthy");
483        assert_eq!(loaded.equity, dec!(10000));
484
485        let all = liq_reader.get_all_liquidation_states().await.unwrap();
486        assert_eq!(all.len(), 1);
487    }
488
489    #[tokio::test]
490    async fn insert_liquidation_history_roundtrip() {
491        let test_db = TestDb::new().await.unwrap();
492        let db = test_db.diesel_db().await;
493        let wallet = test_wallet(42);
494
495        let entry = LiquidationHistoryRecord {
496            id: 0, // ignored on insert
497            wallet_address: wallet,
498            previous_state: "healthy".to_string(),
499            new_state: "PreLiquidation".to_string(),
500            liquidation_mode: Some("partial".to_string()),
501            equity: dec!(4000),
502            mm_required: dec!(5000),
503            maintenance_margin: dec!(5000),
504            shortfall: dec!(1000),
505            auction_id: None,
506            request_id: None,
507            tx_hash: None,
508            margin_needed: None,
509            winner_address: None,
510            bonus: None,
511            details: serde_json::json!({"reason": "mm_breach"}),
512            timestamp: 1700000000000,
513            created_at: None,
514        };
515
516        let liq_writer: &dyn LiquidationWriter = &db;
517        let inserted_id = liq_writer.insert_liquidation_history(&entry).await.unwrap();
518        assert!(inserted_id > 0);
519
520        let liq_reader: &dyn LiquidationReader = &db;
521        let (history, count) = liq_reader
522            .get_liquidation_history(&wallet, 10, 0)
523            .await
524            .unwrap();
525        assert_eq!(count, 1);
526        assert_eq!(history.len(), 1);
527        assert_eq!(history[0].previous_state, "healthy");
528        assert_eq!(history[0].new_state, "PreLiquidation");
529        assert_eq!(history[0].shortfall, dec!(1000));
530        assert_eq!(history[0].wallet_address, wallet);
531    }
532
533    #[tokio::test]
534    async fn create_and_update_auction() {
535        let test_db = TestDb::new().await.unwrap();
536        let db = test_db.diesel_db().await;
537        let wallet = test_wallet(43);
538
539        let auction = LiquidationAuctionRecord {
540            auction_id: "auction-001".to_string(),
541            wallet_address: wallet,
542            status: "pending".to_string(),
543            positions: serde_json::json!([{"symbol": "BTC-PERP", "size": 1}]),
544            equity_at_start: dec!(3000),
545            mm_shortfall_at_start: dec!(2000),
546            target_equity: Some(dec!(5000)),
547            request_id: Some("req-100".to_string()),
548            tx_hash: None,
549            started_at: 1700000000,
550            chain_start_time: None,
551            margin_needed: None,
552            stop_request_id: None,
553            stop_tx_hash: None,
554            completed_at: None,
555            liquidator_address: None,
556            bonus: None,
557            settlement_value: None,
558            last_observed_block: Some(1000),
559            created_at: None,
560            updated_at: None,
561        };
562
563        let liq_writer: &dyn LiquidationWriter = &db;
564        liq_writer
565            .create_liquidation_auction(&auction)
566            .await
567            .unwrap();
568
569        // Read it back
570        let liq_reader: &dyn LiquidationReader = &db;
571        let loaded = liq_reader
572            .get_liquidation_auction("auction-001")
573            .await
574            .unwrap()
575            .expect("auction should exist");
576        assert_eq!(loaded.status, "pending");
577        assert_eq!(loaded.wallet_address, wallet);
578        assert_eq!(loaded.equity_at_start, dec!(3000));
579
580        // Update the auction
581        let updates = LiquidationAuctionUpdate {
582            status: Some("completed".to_string()),
583            request_id: None,
584            tx_hash: Some("0xhash123".to_string()),
585            chain_start_time: Some(1700000100),
586            margin_needed: None,
587            stop_request_id: None,
588            stop_tx_hash: None,
589            completed_at: Some(1700001000),
590            liquidator_address: Some(test_wallet(44)),
591            bonus: Some(dec!(50)),
592            settlement_value: Some(dec!(2500)),
593            last_observed_block: Some(2000),
594        };
595
596        liq_writer
597            .update_liquidation_auction("auction-001", &updates)
598            .await
599            .unwrap();
600
601        // Verify update
602        let updated = liq_reader
603            .get_liquidation_auction("auction-001")
604            .await
605            .unwrap()
606            .expect("auction should still exist");
607        assert_eq!(updated.status, "completed");
608        assert_eq!(updated.tx_hash.as_deref(), Some("0xhash123"));
609        assert_eq!(updated.completed_at, Some(1700001000));
610        assert_eq!(updated.liquidator_address, Some(test_wallet(44)));
611        assert_eq!(updated.bonus, Some(dec!(50)));
612        assert_eq!(updated.last_observed_block, Some(2000));
613    }
614
615    #[tokio::test]
616    async fn claim_and_apply_liquidation_bonus_roundtrip() {
617        let test_db = TestDb::new().await.unwrap();
618        let db = test_db.diesel_db().await;
619        let liquidated = test_wallet(45);
620        let winner = test_wallet(46);
621
622        // Create the auction first (FK on liquidation_states.auction_id)
623        let auction = LiquidationAuctionRecord {
624            auction_id: "auction-bonus".to_string(),
625            wallet_address: liquidated,
626            status: "active".to_string(),
627            positions: serde_json::json!([{"symbol": "BTC-PERP", "size": 1}]),
628            equity_at_start: dec!(1000),
629            mm_shortfall_at_start: dec!(4000),
630            target_equity: None,
631            request_id: None,
632            tx_hash: None,
633            started_at: 1700000000,
634            chain_start_time: None,
635            margin_needed: None,
636            stop_request_id: None,
637            stop_tx_hash: None,
638            completed_at: None,
639            liquidator_address: None,
640            bonus: None,
641            settlement_value: None,
642            last_observed_block: None,
643            created_at: None,
644            updated_at: None,
645        };
646        let liq_writer: &dyn LiquidationWriter = &db;
647        liq_writer
648            .create_liquidation_auction(&auction)
649            .await
650            .unwrap();
651
652        // First, save a liquidation state for the liquidated wallet (required by the method)
653        let state = LiquidationStateRecord {
654            wallet_address: liquidated,
655            state: "in_liquidation".to_string(),
656            margin_mode: "standard".to_string(),
657            equity: dec!(1000),
658            mm_required: dec!(5000),
659            maintenance_margin: dec!(5000),
660            liquidation_mode: Some("full".to_string()),
661            target_equity: None,
662            entered_pre_liq_at: None,
663            mm_shortfall: Some(dec!(4000)),
664            escalation_deadline: None,
665            last_reprice_at: None,
666            partial_order_request_ids: None,
667            partial_order_client_ids: None,
668            partial_bonus_bps: None,
669            auction_id: Some("auction-bonus".to_string()),
670            request_id: None,
671            tx_hash: None,
672            auction_started_at: Some(1700000000),
673            chain_start_time: None,
674            margin_needed: None,
675            stop_request_id: None,
676            stop_tx_hash: None,
677            liquidated_at: Some(1700001000),
678            resolved_winner: None,
679            resolved_bonus: None,
680            resolution_tx_hash: None, // must be None for first claim
681            last_observed_block: None,
682            updated_at_ms: None,
683            created_at: None,
684            updated_at: None,
685        };
686
687        liq_writer.save_liquidation_state(&state).await.unwrap();
688
689        // Claim and apply bonus
690        let result = liq_writer
691            .claim_and_apply_liquidation_bonus(
692                &winner,
693                &liquidated,
694                "auction-bonus",
695                "0xresolution_hash",
696                dec!(200),
697                1700002000000,
698            )
699            .await
700            .unwrap();
701
702        assert!(result.newly_applied);
703        assert!(result.ledger_event_id.is_some());
704
705        // Idempotent replay: same call returns same result
706        let result2 = liq_writer
707            .claim_and_apply_liquidation_bonus(
708                &winner,
709                &liquidated,
710                "auction-bonus",
711                "0xresolution_hash",
712                dec!(200),
713                1700002000000,
714            )
715            .await
716            .unwrap();
717
718        assert!(!result2.newly_applied);
719        assert_eq!(result2.ledger_event_id, result.ledger_event_id);
720    }
721}