Skip to main content

hypercall_db_diesel/
pm_settlement.rs

1use anyhow::{anyhow, Result};
2use diesel::pg::PgConnection;
3use diesel::prelude::*;
4use diesel::sql_types::{
5    BigInt, Bool, Bytea, Integer, Nullable, Numeric, Text, Timestamptz, Uuid as SqlUuid,
6};
7
8use hypercall_types::WalletAddress;
9
10use crate::database_handler::DatabaseHandler;
11use crate::diesel_db::DieselDb;
12use crate::engine_enums::DbUuid;
13
14fn wallet_from_bytes(bytes: Vec<u8>) -> Result<WalletAddress> {
15    let raw: [u8; 20] = bytes
16        .try_into()
17        .map_err(|bytes: Vec<u8>| anyhow!("invalid wallet byte length {}", bytes.len()))?;
18    Ok(WalletAddress::from(raw))
19}
20
21#[derive(QueryableByName)]
22struct PoolRow {
23    #[diesel(sql_type = Text)]
24    underlying: String,
25    #[diesel(sql_type = Integer)]
26    config_version: i32,
27    #[diesel(sql_type = Integer)]
28    policy_version: i32,
29    #[diesel(sql_type = Numeric)]
30    pool_available_usdc: rust_decimal::Decimal,
31    #[diesel(sql_type = Numeric)]
32    pool_target_usdc: rust_decimal::Decimal,
33    #[diesel(sql_type = Numeric)]
34    pool_capacity_usdc: rust_decimal::Decimal,
35    #[diesel(sql_type = Nullable<Numeric>)]
36    pool_utilization: Option<rust_decimal::Decimal>,
37    #[diesel(sql_type = Numeric)]
38    active_timing_bridge_usdc: rust_decimal::Decimal,
39    #[diesel(sql_type = Numeric)]
40    active_settlement_debt_usdc: rust_decimal::Decimal,
41    #[diesel(sql_type = Nullable<Numeric>)]
42    target_short_oi_notional_multiplier: Option<rust_decimal::Decimal>,
43    #[diesel(sql_type = Nullable<Numeric>)]
44    utilization_kink: Option<rust_decimal::Decimal>,
45    #[diesel(sql_type = Nullable<Numeric>)]
46    apr_at_kink: Option<rust_decimal::Decimal>,
47    #[diesel(sql_type = Nullable<Numeric>)]
48    max_apr: Option<rust_decimal::Decimal>,
49    #[diesel(sql_type = Nullable<Numeric>)]
50    normal_utilization_cap: Option<rust_decimal::Decimal>,
51    #[diesel(sql_type = Nullable<Numeric>)]
52    crisis_utilization_cap: Option<rust_decimal::Decimal>,
53    #[diesel(sql_type = Nullable<BigInt>)]
54    bridge_window_ms: Option<i64>,
55    #[diesel(sql_type = BigInt)]
56    last_engine_command_id: i64,
57    #[diesel(sql_type = BigInt)]
58    projection_seq: i64,
59    #[diesel(sql_type = Timestamptz)]
60    updated_at: chrono::DateTime<chrono::Utc>,
61}
62
63impl From<PoolRow> for hypercall_db::PmSettlementPoolProjection {
64    fn from(row: PoolRow) -> Self {
65        Self {
66            underlying: row.underlying,
67            config_version: row.config_version,
68            policy_version: row.policy_version,
69            pool_available_usdc: row.pool_available_usdc,
70            pool_target_usdc: row.pool_target_usdc,
71            pool_capacity_usdc: row.pool_capacity_usdc,
72            pool_utilization: row.pool_utilization,
73            active_timing_bridge_usdc: row.active_timing_bridge_usdc,
74            active_settlement_debt_usdc: row.active_settlement_debt_usdc,
75            target_short_oi_notional_multiplier: row.target_short_oi_notional_multiplier,
76            utilization_kink: row.utilization_kink,
77            apr_at_kink: row.apr_at_kink,
78            max_apr: row.max_apr,
79            normal_utilization_cap: row.normal_utilization_cap,
80            crisis_utilization_cap: row.crisis_utilization_cap,
81            bridge_window_ms: row.bridge_window_ms,
82            last_engine_command_id: row.last_engine_command_id,
83            projection_seq: row.projection_seq,
84            updated_at: row.updated_at,
85        }
86    }
87}
88
89#[derive(QueryableByName)]
90struct PoolGateCountsRow {
91    #[diesel(sql_type = BigInt)]
92    total_pools: i64,
93    #[diesel(sql_type = BigInt)]
94    below_target_pools: i64,
95    #[diesel(sql_type = BigInt)]
96    above_crisis_cap_pools: i64,
97    #[diesel(sql_type = BigInt)]
98    missing_utilization_pools: i64,
99    #[diesel(sql_type = BigInt)]
100    missing_crisis_cap_pools: i64,
101}
102
103impl From<PoolGateCountsRow> for hypercall_db::PmSettlementPoolGateCounts {
104    fn from(row: PoolGateCountsRow) -> Self {
105        Self {
106            total_pools: row.total_pools,
107            below_target_pools: row.below_target_pools,
108            above_crisis_cap_pools: row.above_crisis_cap_pools,
109            missing_utilization_pools: row.missing_utilization_pools,
110            missing_crisis_cap_pools: row.missing_crisis_cap_pools,
111        }
112    }
113}
114
115#[derive(QueryableByName)]
116struct AccountRow {
117    #[diesel(sql_type = Bytea)]
118    wallet: Vec<u8>,
119    #[diesel(sql_type = Text)]
120    underlying: String,
121    #[diesel(sql_type = Text)]
122    status: String,
123    #[diesel(sql_type = Numeric)]
124    timing_bridge_principal_usdc: rust_decimal::Decimal,
125    #[diesel(sql_type = Numeric)]
126    settlement_debt_principal_usdc: rust_decimal::Decimal,
127    #[diesel(sql_type = Numeric)]
128    accrued_interest_usdc: rust_decimal::Decimal,
129    #[diesel(sql_type = BigInt)]
130    interest_cursor_ms: i64,
131    #[diesel(sql_type = Nullable<BigInt>)]
132    bridge_deadline_ms: Option<i64>,
133    #[diesel(sql_type = Nullable<Text>)]
134    active_recovery_plan_id: Option<String>,
135    #[diesel(sql_type = Integer)]
136    policy_version: i32,
137    #[diesel(sql_type = BigInt)]
138    last_engine_command_id: i64,
139    #[diesel(sql_type = BigInt)]
140    projection_seq: i64,
141    #[diesel(sql_type = Timestamptz)]
142    updated_at: chrono::DateTime<chrono::Utc>,
143}
144
145impl TryFrom<AccountRow> for hypercall_db::PmSettlementAccountProjection {
146    type Error = anyhow::Error;
147
148    fn try_from(row: AccountRow) -> Result<Self> {
149        Ok(Self {
150            wallet: wallet_from_bytes(row.wallet)?,
151            underlying: row.underlying,
152            status: row.status,
153            timing_bridge_principal_usdc: row.timing_bridge_principal_usdc,
154            settlement_debt_principal_usdc: row.settlement_debt_principal_usdc,
155            accrued_interest_usdc: row.accrued_interest_usdc,
156            interest_cursor_ms: row.interest_cursor_ms,
157            bridge_deadline_ms: row.bridge_deadline_ms,
158            active_recovery_plan_id: row.active_recovery_plan_id,
159            policy_version: row.policy_version,
160            last_engine_command_id: row.last_engine_command_id,
161            projection_seq: row.projection_seq,
162            updated_at: row.updated_at,
163        })
164    }
165}
166
167#[derive(QueryableByName)]
168struct AccountGateCountsRow {
169    #[diesel(sql_type = BigInt)]
170    total_accounts: i64,
171    #[diesel(sql_type = BigInt)]
172    bridged_accounts: i64,
173    #[diesel(sql_type = BigInt)]
174    debt_accounts: i64,
175    #[diesel(sql_type = BigInt)]
176    overdue_bridge_accounts: i64,
177    #[diesel(sql_type = BigInt)]
178    active_recovery_accounts: i64,
179    #[diesel(sql_type = Numeric)]
180    active_bridge_usdc: rust_decimal::Decimal,
181    #[diesel(sql_type = Numeric)]
182    active_debt_usdc: rust_decimal::Decimal,
183}
184
185impl From<AccountGateCountsRow> for hypercall_db::PmSettlementAccountGateCounts {
186    fn from(row: AccountGateCountsRow) -> Self {
187        Self {
188            total_accounts: row.total_accounts,
189            bridged_accounts: row.bridged_accounts,
190            debt_accounts: row.debt_accounts,
191            overdue_bridge_accounts: row.overdue_bridge_accounts,
192            active_recovery_accounts: row.active_recovery_accounts,
193            active_bridge_usdc: row.active_bridge_usdc,
194            active_debt_usdc: row.active_debt_usdc,
195        }
196    }
197}
198
199#[derive(QueryableByName)]
200struct EventRow {
201    #[diesel(sql_type = Text)]
202    event_key: String,
203    #[diesel(sql_type = Bytea)]
204    wallet: Vec<u8>,
205    #[diesel(sql_type = Text)]
206    underlying: String,
207    #[diesel(sql_type = Text)]
208    event_type: String,
209    #[diesel(sql_type = Numeric)]
210    amount_usdc: rust_decimal::Decimal,
211    #[diesel(sql_type = SqlUuid)]
212    request_id: DbUuid,
213    #[diesel(sql_type = Text)]
214    input_digest: String,
215    #[diesel(sql_type = BigInt)]
216    engine_command_id: i64,
217    #[diesel(sql_type = BigInt)]
218    projection_seq: i64,
219    #[diesel(sql_type = Timestamptz)]
220    created_at: chrono::DateTime<chrono::Utc>,
221}
222
223impl TryFrom<EventRow> for hypercall_db::PmSettlementEventProjection {
224    type Error = anyhow::Error;
225
226    fn try_from(row: EventRow) -> Result<Self> {
227        Ok(Self {
228            event_key: row.event_key,
229            wallet: wallet_from_bytes(row.wallet)?,
230            underlying: row.underlying,
231            event_type: row.event_type,
232            amount_usdc: row.amount_usdc,
233            request_id: row.request_id.into(),
234            input_digest: row.input_digest,
235            engine_command_id: row.engine_command_id,
236            projection_seq: row.projection_seq,
237            created_at: row.created_at,
238        })
239    }
240}
241
242#[derive(QueryableByName)]
243struct InterestEventRow {
244    #[diesel(sql_type = SqlUuid)]
245    request_id: DbUuid,
246    #[diesel(sql_type = Bytea)]
247    wallet: Vec<u8>,
248    #[diesel(sql_type = Text)]
249    underlying: String,
250    #[diesel(sql_type = BigInt)]
251    from_ms: i64,
252    #[diesel(sql_type = BigInt)]
253    to_ms: i64,
254    #[diesel(sql_type = Numeric)]
255    utilization: rust_decimal::Decimal,
256    #[diesel(sql_type = Numeric)]
257    apr: rust_decimal::Decimal,
258    #[diesel(sql_type = Numeric)]
259    interest_usdc: rust_decimal::Decimal,
260    #[diesel(sql_type = Integer)]
261    policy_version: i32,
262    #[diesel(sql_type = BigInt)]
263    engine_command_id: i64,
264    #[diesel(sql_type = BigInt)]
265    projection_seq: i64,
266    #[diesel(sql_type = Timestamptz)]
267    created_at: chrono::DateTime<chrono::Utc>,
268}
269
270impl TryFrom<InterestEventRow> for hypercall_db::PmSettlementInterestEventProjection {
271    type Error = anyhow::Error;
272
273    fn try_from(row: InterestEventRow) -> Result<Self> {
274        Ok(Self {
275            request_id: row.request_id.into(),
276            wallet: wallet_from_bytes(row.wallet)?,
277            underlying: row.underlying,
278            from_ms: row.from_ms,
279            to_ms: row.to_ms,
280            utilization: row.utilization,
281            apr: row.apr,
282            interest_usdc: row.interest_usdc,
283            policy_version: row.policy_version,
284            engine_command_id: row.engine_command_id,
285            projection_seq: row.projection_seq,
286            created_at: row.created_at,
287        })
288    }
289}
290
291#[derive(QueryableByName)]
292struct RepaymentEventRow {
293    #[diesel(sql_type = SqlUuid)]
294    request_id: DbUuid,
295    #[diesel(sql_type = Bytea)]
296    wallet: Vec<u8>,
297    #[diesel(sql_type = Text)]
298    underlying: String,
299    #[diesel(sql_type = Numeric)]
300    amount_usdc: rust_decimal::Decimal,
301    #[diesel(sql_type = Numeric)]
302    interest_paid_usdc: rust_decimal::Decimal,
303    #[diesel(sql_type = Numeric)]
304    principal_paid_usdc: rust_decimal::Decimal,
305    #[diesel(sql_type = Text)]
306    reason: String,
307    #[diesel(sql_type = Text)]
308    source_event_id: String,
309    #[diesel(sql_type = BigInt)]
310    engine_command_id: i64,
311    #[diesel(sql_type = BigInt)]
312    projection_seq: i64,
313    #[diesel(sql_type = Timestamptz)]
314    created_at: chrono::DateTime<chrono::Utc>,
315}
316
317impl TryFrom<RepaymentEventRow> for hypercall_db::PmSettlementRepaymentEventProjection {
318    type Error = anyhow::Error;
319
320    fn try_from(row: RepaymentEventRow) -> Result<Self> {
321        Ok(Self {
322            request_id: row.request_id.into(),
323            wallet: wallet_from_bytes(row.wallet)?,
324            underlying: row.underlying,
325            amount_usdc: row.amount_usdc,
326            interest_paid_usdc: row.interest_paid_usdc,
327            principal_paid_usdc: row.principal_paid_usdc,
328            reason: row.reason,
329            source_event_id: row.source_event_id,
330            engine_command_id: row.engine_command_id,
331            projection_seq: row.projection_seq,
332            created_at: row.created_at,
333        })
334    }
335}
336
337#[derive(QueryableByName)]
338struct RecoveryPlanRow {
339    #[diesel(sql_type = Text)]
340    plan_id: String,
341    #[diesel(sql_type = Bytea)]
342    wallet: Vec<u8>,
343    #[diesel(sql_type = Text)]
344    underlying: String,
345    #[diesel(sql_type = Text)]
346    status: String,
347    #[diesel(sql_type = Bool)]
348    placeholder: bool,
349    #[diesel(sql_type = Nullable<Text>)]
350    trigger: Option<String>,
351    #[diesel(sql_type = Nullable<Text>)]
352    reason: Option<String>,
353    #[diesel(sql_type = Nullable<Integer>)]
354    policy_version: Option<i32>,
355    #[diesel(sql_type = Nullable<Integer>)]
356    recovery_priority_version: Option<i32>,
357    #[diesel(sql_type = Nullable<Numeric>)]
358    target_reduction_usdc: Option<rust_decimal::Decimal>,
359    #[diesel(sql_type = Nullable<Numeric>)]
360    expected_usdc_recovered: Option<rust_decimal::Decimal>,
361    #[diesel(sql_type = Nullable<Numeric>)]
362    expected_obligation_reduced: Option<rust_decimal::Decimal>,
363    #[diesel(sql_type = Nullable<Numeric>)]
364    expected_impact_usdc: Option<rust_decimal::Decimal>,
365    #[diesel(sql_type = Nullable<Numeric>)]
366    post_plan_utilization: Option<rust_decimal::Decimal>,
367    #[diesel(sql_type = BigInt)]
368    engine_command_id: i64,
369    #[diesel(sql_type = BigInt)]
370    projection_seq: i64,
371    #[diesel(sql_type = Timestamptz)]
372    created_at: chrono::DateTime<chrono::Utc>,
373    #[diesel(sql_type = Timestamptz)]
374    updated_at: chrono::DateTime<chrono::Utc>,
375}
376
377impl TryFrom<RecoveryPlanRow> for hypercall_db::PmRecoveryPlanProjection {
378    type Error = anyhow::Error;
379
380    fn try_from(row: RecoveryPlanRow) -> Result<Self> {
381        Ok(Self {
382            plan_id: row.plan_id,
383            wallet: wallet_from_bytes(row.wallet)?,
384            underlying: row.underlying,
385            status: row.status,
386            placeholder: row.placeholder,
387            trigger: row.trigger,
388            reason: row.reason,
389            policy_version: row.policy_version,
390            recovery_priority_version: row.recovery_priority_version,
391            target_reduction_usdc: row.target_reduction_usdc,
392            expected_usdc_recovered: row.expected_usdc_recovered,
393            expected_obligation_reduced: row.expected_obligation_reduced,
394            expected_impact_usdc: row.expected_impact_usdc,
395            post_plan_utilization: row.post_plan_utilization,
396            engine_command_id: row.engine_command_id,
397            projection_seq: row.projection_seq,
398            created_at: row.created_at,
399            updated_at: row.updated_at,
400        })
401    }
402}
403
404#[derive(QueryableByName)]
405struct RecoveryActionRow {
406    #[diesel(sql_type = Text)]
407    plan_id: String,
408    #[diesel(sql_type = Text)]
409    action_id: String,
410    #[diesel(sql_type = Text)]
411    action_type: String,
412    #[diesel(sql_type = Text)]
413    status: String,
414    #[diesel(sql_type = Nullable<Text>)]
415    target: Option<String>,
416    #[diesel(sql_type = Nullable<Integer>)]
417    attempt: Option<i32>,
418    #[diesel(sql_type = Nullable<Text>)]
419    external_id: Option<String>,
420    #[diesel(sql_type = Nullable<Text>)]
421    external_kind: Option<String>,
422    #[diesel(sql_type = Nullable<Text>)]
423    result_external_id: Option<String>,
424    #[diesel(sql_type = Nullable<Text>)]
425    result: Option<String>,
426    #[diesel(sql_type = Nullable<Numeric>)]
427    expected_usdc_recovered: Option<rust_decimal::Decimal>,
428    #[diesel(sql_type = Nullable<Numeric>)]
429    expected_obligation_reduced: Option<rust_decimal::Decimal>,
430    #[diesel(sql_type = Nullable<Numeric>)]
431    expected_impact_usdc: Option<rust_decimal::Decimal>,
432    #[diesel(sql_type = Nullable<Numeric>)]
433    recovered_usdc: Option<rust_decimal::Decimal>,
434    #[diesel(sql_type = Nullable<Numeric>)]
435    liability_reduction_usdc: Option<rust_decimal::Decimal>,
436    #[diesel(sql_type = BigInt)]
437    engine_command_id: i64,
438    #[diesel(sql_type = BigInt)]
439    projection_seq: i64,
440    #[diesel(sql_type = Timestamptz)]
441    created_at: chrono::DateTime<chrono::Utc>,
442    #[diesel(sql_type = Timestamptz)]
443    updated_at: chrono::DateTime<chrono::Utc>,
444}
445
446impl From<RecoveryActionRow> for hypercall_db::PmRecoveryActionProjection {
447    fn from(row: RecoveryActionRow) -> Self {
448        Self {
449            plan_id: row.plan_id,
450            action_id: row.action_id,
451            action_type: row.action_type,
452            status: row.status,
453            target: row.target,
454            attempt: row.attempt,
455            external_id: row.external_id,
456            external_kind: row.external_kind,
457            result_external_id: row.result_external_id,
458            result: row.result,
459            expected_usdc_recovered: row.expected_usdc_recovered,
460            expected_obligation_reduced: row.expected_obligation_reduced,
461            expected_impact_usdc: row.expected_impact_usdc,
462            recovered_usdc: row.recovered_usdc,
463            liability_reduction_usdc: row.liability_reduction_usdc,
464            engine_command_id: row.engine_command_id,
465            projection_seq: row.projection_seq,
466            created_at: row.created_at,
467            updated_at: row.updated_at,
468        }
469    }
470}
471
472#[derive(QueryableByName)]
473struct RecoveryActionGateCountsRow {
474    #[diesel(sql_type = BigInt)]
475    total_actions: i64,
476    #[diesel(sql_type = BigInt)]
477    planned_actions: i64,
478    #[diesel(sql_type = BigInt)]
479    submitted_actions: i64,
480    #[diesel(sql_type = BigInt)]
481    terminal_actions: i64,
482}
483
484impl From<RecoveryActionGateCountsRow> for hypercall_db::PmRecoveryActionGateCounts {
485    fn from(row: RecoveryActionGateCountsRow) -> Self {
486        Self {
487            total_actions: row.total_actions,
488            planned_actions: row.planned_actions,
489            submitted_actions: row.submitted_actions,
490            terminal_actions: row.terminal_actions,
491        }
492    }
493}
494
495fn apply_pm_settlement_projection_write(
496    conn: &mut PgConnection,
497    write: &hypercall_db::PmSettlementProjectionWrite,
498) -> QueryResult<usize> {
499    match write {
500        hypercall_db::PmSettlementProjectionWrite::Pool(pool) => {
501            diesel::sql_query(
502                "INSERT INTO pm_settlement_pools (
503                    underlying, config_version, policy_version, pool_available_usdc,
504                    pool_target_usdc, pool_capacity_usdc, pool_utilization,
505                    active_timing_bridge_usdc, active_settlement_debt_usdc,
506                    target_short_oi_notional_multiplier, utilization_kink, apr_at_kink,
507                    max_apr, normal_utilization_cap, crisis_utilization_cap,
508                    bridge_window_ms, projection_seq, updated_at
509                 ) VALUES (
510                    $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, to_timestamp($18::double precision / 1000.0)
511                 )
512                 ON CONFLICT (underlying) DO UPDATE SET
513                    config_version = EXCLUDED.config_version,
514                    policy_version = EXCLUDED.policy_version,
515                    pool_available_usdc = EXCLUDED.pool_available_usdc,
516                    pool_target_usdc = EXCLUDED.pool_target_usdc,
517                    pool_capacity_usdc = EXCLUDED.pool_capacity_usdc,
518                    pool_utilization = EXCLUDED.pool_utilization,
519                    active_timing_bridge_usdc = EXCLUDED.active_timing_bridge_usdc,
520                    active_settlement_debt_usdc = EXCLUDED.active_settlement_debt_usdc,
521                    target_short_oi_notional_multiplier = EXCLUDED.target_short_oi_notional_multiplier,
522                    utilization_kink = EXCLUDED.utilization_kink,
523                    apr_at_kink = EXCLUDED.apr_at_kink,
524                    max_apr = EXCLUDED.max_apr,
525                    normal_utilization_cap = EXCLUDED.normal_utilization_cap,
526                    crisis_utilization_cap = EXCLUDED.crisis_utilization_cap,
527                    bridge_window_ms = EXCLUDED.bridge_window_ms,
528                    projection_seq = GREATEST(pm_settlement_pools.projection_seq, EXCLUDED.projection_seq),
529                    updated_at = EXCLUDED.updated_at
530                 WHERE pm_settlement_pools.projection_seq <= EXCLUDED.projection_seq",
531            )
532            .bind::<Text, _>(&pool.underlying)
533            .bind::<Integer, _>(pool.config_version)
534            .bind::<Integer, _>(pool.policy_version)
535            .bind::<Numeric, _>(pool.pool_available_usdc)
536            .bind::<Numeric, _>(pool.pool_target_usdc)
537            .bind::<Numeric, _>(pool.pool_capacity_usdc)
538            .bind::<Nullable<Numeric>, _>(pool.pool_utilization)
539            .bind::<Numeric, _>(pool.active_timing_bridge_usdc)
540            .bind::<Numeric, _>(pool.active_settlement_debt_usdc)
541            .bind::<Nullable<Numeric>, _>(pool.target_short_oi_notional_multiplier)
542            .bind::<Nullable<Numeric>, _>(pool.utilization_kink)
543            .bind::<Nullable<Numeric>, _>(pool.apr_at_kink)
544            .bind::<Nullable<Numeric>, _>(pool.max_apr)
545            .bind::<Nullable<Numeric>, _>(pool.normal_utilization_cap)
546            .bind::<Nullable<Numeric>, _>(pool.crisis_utilization_cap)
547            .bind::<Nullable<BigInt>, _>(pool.bridge_window_ms)
548            .bind::<BigInt, _>(pool.projection_seq)
549            .bind::<BigInt, _>(pool.updated_at_ms)
550            .execute(conn)
551        }
552        hypercall_db::PmSettlementProjectionWrite::Account(account) => {
553            diesel::sql_query(
554                "INSERT INTO pm_settlement_accounts (
555                    wallet, underlying, status, timing_bridge_principal_usdc,
556                    settlement_debt_principal_usdc, accrued_interest_usdc,
557                    interest_cursor_ms, bridge_deadline_ms, active_recovery_plan_id,
558                    policy_version, projection_seq, updated_at
559                 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, to_timestamp($12::double precision / 1000.0))
560                 ON CONFLICT (wallet, underlying) DO UPDATE SET
561                    status = EXCLUDED.status,
562                    timing_bridge_principal_usdc = EXCLUDED.timing_bridge_principal_usdc,
563                    settlement_debt_principal_usdc = EXCLUDED.settlement_debt_principal_usdc,
564                    accrued_interest_usdc = EXCLUDED.accrued_interest_usdc,
565                    interest_cursor_ms = EXCLUDED.interest_cursor_ms,
566                    bridge_deadline_ms = EXCLUDED.bridge_deadline_ms,
567                    active_recovery_plan_id = EXCLUDED.active_recovery_plan_id,
568                    policy_version = EXCLUDED.policy_version,
569                    projection_seq = GREATEST(pm_settlement_accounts.projection_seq, EXCLUDED.projection_seq),
570                    updated_at = EXCLUDED.updated_at
571                 WHERE pm_settlement_accounts.projection_seq <= EXCLUDED.projection_seq",
572            )
573            .bind::<Bytea, _>(account.wallet.as_bytes())
574            .bind::<Text, _>(&account.underlying)
575            .bind::<Text, _>(&account.status)
576            .bind::<Numeric, _>(account.timing_bridge_principal_usdc)
577            .bind::<Numeric, _>(account.settlement_debt_principal_usdc)
578            .bind::<Numeric, _>(account.accrued_interest_usdc)
579            .bind::<BigInt, _>(account.interest_cursor_ms)
580            .bind::<Nullable<BigInt>, _>(account.bridge_deadline_ms)
581            .bind::<Nullable<Text>, _>(&account.active_recovery_plan_id)
582            .bind::<Integer, _>(account.policy_version)
583            .bind::<BigInt, _>(account.projection_seq)
584            .bind::<BigInt, _>(account.updated_at_ms)
585            .execute(conn)
586        }
587        hypercall_db::PmSettlementProjectionWrite::Event(event) => {
588            diesel::sql_query(
589                "INSERT INTO pm_settlement_events (
590                    event_key, wallet, underlying, event_type, amount_usdc,
591                    request_id, input_digest
592                 ) VALUES ($1, $2, $3, $4, $5, $6, $7)
593                 ON CONFLICT (event_key) DO UPDATE SET
594                    event_type = EXCLUDED.event_type,
595                    amount_usdc = EXCLUDED.amount_usdc,
596                    request_id = EXCLUDED.request_id,
597                    input_digest = EXCLUDED.input_digest",
598            )
599            .bind::<Text, _>(&event.event_key)
600            .bind::<Bytea, _>(event.wallet.as_bytes())
601            .bind::<Text, _>(&event.underlying)
602            .bind::<Text, _>(&event.event_type)
603            .bind::<Numeric, _>(event.amount_usdc)
604            .bind::<SqlUuid, _>(DbUuid::from(event.request_id))
605            .bind::<Text, _>(&event.input_digest)
606            .execute(conn)
607        }
608        hypercall_db::PmSettlementProjectionWrite::InterestEvent(event) => {
609            diesel::sql_query(
610                "INSERT INTO pm_settlement_interest_events (
611                    request_id, wallet, underlying, from_ms, to_ms, utilization, apr,
612                    interest_usdc, policy_version
613                 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
614                 ON CONFLICT (request_id) DO NOTHING",
615            )
616            .bind::<SqlUuid, _>(DbUuid(event.request_id))
617            .bind::<Bytea, _>(event.wallet.as_bytes())
618            .bind::<Text, _>(&event.underlying)
619            .bind::<BigInt, _>(event.from_ms)
620            .bind::<BigInt, _>(event.to_ms)
621            .bind::<Numeric, _>(event.utilization)
622            .bind::<Numeric, _>(event.apr)
623            .bind::<Numeric, _>(event.interest_usdc)
624            .bind::<Integer, _>(event.policy_version)
625            .execute(conn)
626        }
627        hypercall_db::PmSettlementProjectionWrite::RepaymentEvent(event) => {
628            diesel::sql_query(
629                "INSERT INTO pm_settlement_repayment_events (
630                    request_id, wallet, underlying, amount_usdc, interest_paid_usdc,
631                    principal_paid_usdc, reason, source_event_id
632                 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
633                 ON CONFLICT (request_id) DO NOTHING",
634            )
635            .bind::<SqlUuid, _>(DbUuid(event.request_id))
636            .bind::<Bytea, _>(event.wallet.as_bytes())
637            .bind::<Text, _>(&event.underlying)
638            .bind::<Numeric, _>(event.amount_usdc)
639            .bind::<Numeric, _>(event.interest_paid_usdc)
640            .bind::<Numeric, _>(event.principal_paid_usdc)
641            .bind::<Text, _>(&event.reason)
642            .bind::<Text, _>(&event.source_event_id)
643            .execute(conn)
644        }
645        hypercall_db::PmSettlementProjectionWrite::VaultDeposit(deposit) => {
646            diesel::sql_query(
647                "INSERT INTO pm_vault_deposits (
648                    deposit_id, depositor, underlying, principal_usdc, remaining_usdc,
649                    withdrawn_usdc, reserved_withdrawal_usdc, chain_id, source_contract_address,
650                    tx_hash, log_index, max_listed_expiry_ts_ms, settlement_grace_ms,
651                    lock_until_ms, status, projection_seq, created_at, updated_at
652                 ) VALUES (
653                    $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14,
654                    $15::pm_vault_deposit_status, $16, to_timestamp($17::double precision / 1000.0),
655                    to_timestamp($18::double precision / 1000.0)
656                 )
657                 ON CONFLICT (deposit_id) DO UPDATE SET
658                    remaining_usdc = EXCLUDED.remaining_usdc,
659                    withdrawn_usdc = EXCLUDED.withdrawn_usdc,
660                    reserved_withdrawal_usdc = EXCLUDED.reserved_withdrawal_usdc,
661                    status = EXCLUDED.status,
662                    projection_seq = GREATEST(pm_vault_deposits.projection_seq, EXCLUDED.projection_seq),
663                    updated_at = EXCLUDED.updated_at
664                 WHERE pm_vault_deposits.projection_seq <= EXCLUDED.projection_seq",
665            )
666            .bind::<SqlUuid, _>(DbUuid(deposit.deposit_id))
667            .bind::<Bytea, _>(deposit.depositor.as_bytes())
668            .bind::<Text, _>(&deposit.underlying)
669            .bind::<Numeric, _>(deposit.principal_usdc)
670            .bind::<Numeric, _>(deposit.remaining_usdc)
671            .bind::<Numeric, _>(deposit.withdrawn_usdc)
672            .bind::<Numeric, _>(deposit.reserved_withdrawal_usdc)
673            .bind::<BigInt, _>(deposit.chain_id)
674            .bind::<Bytea, _>(deposit.source_contract_address.as_bytes())
675            .bind::<Text, _>(&deposit.tx_hash)
676            .bind::<Integer, _>(deposit.log_index)
677            .bind::<BigInt, _>(deposit.max_listed_expiry_ts_ms)
678            .bind::<BigInt, _>(deposit.settlement_grace_ms)
679            .bind::<BigInt, _>(deposit.lock_until_ms)
680            .bind::<Text, _>(deposit.status.as_str())
681            .bind::<BigInt, _>(deposit.projection_seq)
682            .bind::<BigInt, _>(deposit.created_at_ms)
683            .bind::<BigInt, _>(deposit.updated_at_ms)
684            .execute(conn)
685        }
686        hypercall_db::PmSettlementProjectionWrite::VaultWithdrawal(withdrawal) => {
687            diesel::sql_query(
688                "INSERT INTO pm_vault_withdrawals (
689                    withdrawal_id, deposit_id, depositor, underlying, amount_usdc,
690                    lock_until_ms, status, projection_seq, requested_at, updated_at
691                 ) VALUES (
692                    $1, $2, $3, $4, $5, $6, $7::pm_vault_withdrawal_status, $8,
693                    to_timestamp($9::double precision / 1000.0),
694                    to_timestamp($10::double precision / 1000.0)
695                 )
696                 ON CONFLICT (withdrawal_id) DO UPDATE SET
697                    status = EXCLUDED.status,
698                    projection_seq = GREATEST(pm_vault_withdrawals.projection_seq, EXCLUDED.projection_seq),
699                    updated_at = EXCLUDED.updated_at
700                 WHERE pm_vault_withdrawals.projection_seq <= EXCLUDED.projection_seq",
701            )
702            .bind::<SqlUuid, _>(DbUuid(withdrawal.withdrawal_id))
703            .bind::<SqlUuid, _>(DbUuid(withdrawal.deposit_id))
704            .bind::<Bytea, _>(withdrawal.depositor.as_bytes())
705            .bind::<Text, _>(&withdrawal.underlying)
706            .bind::<Numeric, _>(withdrawal.amount_usdc)
707            .bind::<BigInt, _>(withdrawal.lock_until_ms)
708            .bind::<Text, _>(withdrawal.status.as_str())
709            .bind::<BigInt, _>(withdrawal.projection_seq)
710            .bind::<BigInt, _>(withdrawal.requested_at_ms)
711            .bind::<BigInt, _>(withdrawal.updated_at_ms)
712            .execute(conn)
713        }
714        hypercall_db::PmSettlementProjectionWrite::RecoveryPlan(plan) => {
715            let updated = diesel::sql_query(
716                "INSERT INTO pm_recovery_plans (
717                    plan_id, wallet, underlying, status, placeholder, trigger, reason,
718                    policy_version, recovery_priority_version, target_reduction_usdc,
719                    expected_usdc_recovered, expected_obligation_reduced, expected_impact_usdc,
720                    post_plan_utilization, projection_seq, updated_at
721                 ) VALUES (
722                    $1, $2, $3, $4, false, $5, $6, $7, $8, $9, $10, $11, $12, $13,
723                    $14, to_timestamp($15::double precision / 1000.0)
724                 )
725                 ON CONFLICT (plan_id) DO UPDATE SET
726                    wallet = EXCLUDED.wallet,
727                    underlying = EXCLUDED.underlying,
728                    status = EXCLUDED.status,
729                    placeholder = false,
730                    trigger = EXCLUDED.trigger,
731                    reason = EXCLUDED.reason,
732                    policy_version = EXCLUDED.policy_version,
733                    recovery_priority_version = EXCLUDED.recovery_priority_version,
734                    target_reduction_usdc = EXCLUDED.target_reduction_usdc,
735                    expected_usdc_recovered = EXCLUDED.expected_usdc_recovered,
736                    expected_obligation_reduced = EXCLUDED.expected_obligation_reduced,
737                    expected_impact_usdc = EXCLUDED.expected_impact_usdc,
738                    post_plan_utilization = EXCLUDED.post_plan_utilization,
739                    projection_seq = GREATEST(pm_recovery_plans.projection_seq, EXCLUDED.projection_seq),
740                    updated_at = EXCLUDED.updated_at
741                 WHERE pm_recovery_plans.projection_seq <= EXCLUDED.projection_seq",
742            )
743            .bind::<Text, _>(&plan.plan_id)
744            .bind::<Bytea, _>(plan.wallet.as_bytes())
745            .bind::<Text, _>(&plan.underlying)
746            .bind::<Text, _>(&plan.status)
747            .bind::<Text, _>(&plan.trigger)
748            .bind::<Text, _>(&plan.reason)
749            .bind::<Integer, _>(plan.policy_version)
750            .bind::<Integer, _>(plan.recovery_priority_version)
751            .bind::<Numeric, _>(plan.target_reduction_usdc)
752            .bind::<Numeric, _>(plan.expected_usdc_recovered)
753            .bind::<Numeric, _>(plan.expected_obligation_reduced)
754            .bind::<Numeric, _>(plan.expected_impact_usdc)
755            .bind::<Nullable<Numeric>, _>(plan.post_plan_utilization)
756            .bind::<BigInt, _>(plan.projection_seq)
757            .bind::<BigInt, _>(plan.updated_at_ms)
758            .execute(conn)?;
759
760            for action in &plan.actions {
761                diesel::sql_query(
762                    "INSERT INTO pm_recovery_actions (
763                        plan_id, action_id, action_type, status, target, attempt,
764                        external_id, external_kind, result_external_id, result, expected_usdc_recovered,
765                        expected_obligation_reduced, expected_impact_usdc, recovered_usdc,
766                        liability_reduction_usdc, projection_seq, updated_at
767                     ) VALUES (
768                        $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14,
769                        $15, $16, to_timestamp($17::double precision / 1000.0)
770                     )
771                     ON CONFLICT (plan_id, action_id) DO UPDATE SET
772                        action_type = EXCLUDED.action_type,
773                        status = EXCLUDED.status,
774                        target = EXCLUDED.target,
775                        attempt = EXCLUDED.attempt,
776                        external_id = EXCLUDED.external_id,
777                        external_kind = EXCLUDED.external_kind,
778                        result_external_id = EXCLUDED.result_external_id,
779                        result = EXCLUDED.result,
780                        expected_usdc_recovered = EXCLUDED.expected_usdc_recovered,
781                        expected_obligation_reduced = EXCLUDED.expected_obligation_reduced,
782                        expected_impact_usdc = EXCLUDED.expected_impact_usdc,
783                        recovered_usdc = EXCLUDED.recovered_usdc,
784                        liability_reduction_usdc = EXCLUDED.liability_reduction_usdc,
785                        projection_seq = GREATEST(pm_recovery_actions.projection_seq, EXCLUDED.projection_seq),
786                        updated_at = EXCLUDED.updated_at
787                     WHERE pm_recovery_actions.projection_seq <= EXCLUDED.projection_seq",
788                )
789                .bind::<Text, _>(&action.plan_id)
790                .bind::<Text, _>(&action.action_id)
791                .bind::<Text, _>(&action.action_type)
792                .bind::<Text, _>(&action.status)
793                .bind::<Text, _>(&action.target)
794                .bind::<Integer, _>(action.attempt)
795                .bind::<Nullable<Text>, _>(&action.external_id)
796                .bind::<Nullable<Text>, _>(&action.external_kind)
797                .bind::<Nullable<Text>, _>(&action.result_external_id)
798                .bind::<Nullable<Text>, _>(&action.result)
799                .bind::<Numeric, _>(action.expected_usdc_recovered)
800                .bind::<Numeric, _>(action.expected_obligation_reduced)
801                .bind::<Numeric, _>(action.expected_impact_usdc)
802                .bind::<Numeric, _>(action.recovered_usdc)
803                .bind::<Numeric, _>(action.liability_reduction_usdc)
804                .bind::<BigInt, _>(action.projection_seq)
805                .bind::<BigInt, _>(action.updated_at_ms)
806                .execute(conn)?;
807            }
808            Ok(updated)
809        }
810    }
811}
812
813#[async_trait::async_trait]
814impl hypercall_db::PmSettlementProjectionReader for DieselDb {
815    async fn list_pm_settlement_pools(
816        &self,
817    ) -> Result<Vec<hypercall_db::PmSettlementPoolProjection>> {
818        let mut conn = self.get_conn().await?;
819        let rows = diesel_async::RunQueryDsl::load::<PoolRow>(
820            diesel::sql_query("SELECT * FROM pm_settlement_pools ORDER BY underlying LIMIT 1000"),
821            &mut conn,
822        )
823        .await?;
824        Ok(rows.into_iter().map(Into::into).collect())
825    }
826
827    async fn pm_settlement_pool_gate_counts(
828        &self,
829    ) -> Result<hypercall_db::PmSettlementPoolGateCounts> {
830        let mut conn = self.get_conn().await?;
831        let row = diesel_async::RunQueryDsl::get_result::<PoolGateCountsRow>(
832            diesel::sql_query(
833                "SELECT
834                    COUNT(*)::bigint AS total_pools,
835                    COUNT(*) FILTER (WHERE pool_available_usdc < pool_target_usdc)::bigint AS below_target_pools,
836                    COUNT(*) FILTER (
837                        WHERE pool_utilization IS NOT NULL
838                          AND crisis_utilization_cap IS NOT NULL
839                          AND pool_utilization > crisis_utilization_cap
840                    )::bigint AS above_crisis_cap_pools,
841                    COUNT(*) FILTER (WHERE pool_utilization IS NULL)::bigint AS missing_utilization_pools,
842                    COUNT(*) FILTER (WHERE crisis_utilization_cap IS NULL)::bigint AS missing_crisis_cap_pools
843                 FROM pm_settlement_pools",
844            ),
845            &mut conn,
846        )
847        .await?;
848        Ok(row.into())
849    }
850
851    async fn list_pm_settlement_accounts(
852        &self,
853    ) -> Result<Vec<hypercall_db::PmSettlementAccountProjection>> {
854        let mut conn = self.get_conn().await?;
855        let rows = diesel_async::RunQueryDsl::load::<AccountRow>(
856            diesel::sql_query(
857                "SELECT * FROM pm_settlement_accounts ORDER BY underlying, wallet LIMIT 1000",
858            ),
859            &mut conn,
860        )
861        .await?;
862        rows.into_iter().map(TryInto::try_into).collect()
863    }
864
865    async fn pm_settlement_account_gate_counts(
866        &self,
867        now_ms: i64,
868    ) -> Result<hypercall_db::PmSettlementAccountGateCounts> {
869        let mut conn = self.get_conn().await?;
870        let row = diesel_async::RunQueryDsl::get_result::<AccountGateCountsRow>(
871            diesel::sql_query(
872                "SELECT
873                    COUNT(*)::bigint AS total_accounts,
874                    COUNT(*) FILTER (WHERE timing_bridge_principal_usdc > 0)::bigint AS bridged_accounts,
875                    COUNT(*) FILTER (WHERE settlement_debt_principal_usdc > 0)::bigint AS debt_accounts,
876                    COUNT(*) FILTER (
877                        WHERE timing_bridge_principal_usdc > 0
878                          AND (
879                              bridge_deadline_ms IS NULL
880                              OR bridge_deadline_ms < $1
881                          )
882                    )::bigint AS overdue_bridge_accounts,
883                    COUNT(*) FILTER (
884                        WHERE active_recovery_plan_id IS NOT NULL
885                          AND (
886                              timing_bridge_principal_usdc > 0
887                              OR settlement_debt_principal_usdc > 0
888                              OR accrued_interest_usdc > 0
889                          )
890                    )::bigint AS active_recovery_accounts,
891                    COALESCE(SUM(timing_bridge_principal_usdc) FILTER (WHERE timing_bridge_principal_usdc > 0), 0) AS active_bridge_usdc,
892                    COALESCE(SUM(settlement_debt_principal_usdc) FILTER (WHERE settlement_debt_principal_usdc > 0), 0) AS active_debt_usdc
893                 FROM pm_settlement_accounts",
894            )
895            .bind::<BigInt, _>(now_ms),
896            &mut conn,
897        )
898        .await?;
899        Ok(row.into())
900    }
901
902    async fn list_pm_settlement_events(
903        &self,
904    ) -> Result<Vec<hypercall_db::PmSettlementEventProjection>> {
905        let mut conn = self.get_conn().await?;
906        let rows = diesel_async::RunQueryDsl::load::<EventRow>(
907            diesel::sql_query(
908                "SELECT * FROM pm_settlement_events ORDER BY created_at DESC, event_key LIMIT 1000",
909            ),
910            &mut conn,
911        )
912        .await?;
913        rows.into_iter().map(TryInto::try_into).collect()
914    }
915
916    async fn list_pm_settlement_interest_events(
917        &self,
918    ) -> Result<Vec<hypercall_db::PmSettlementInterestEventProjection>> {
919        let mut conn = self.get_conn().await?;
920        let rows = diesel_async::RunQueryDsl::load::<InterestEventRow>(
921            diesel::sql_query(
922                "SELECT * FROM pm_settlement_interest_events ORDER BY created_at DESC, request_id LIMIT 1000",
923            ),
924            &mut conn,
925        )
926        .await?;
927        rows.into_iter().map(TryInto::try_into).collect()
928    }
929
930    async fn list_pm_settlement_repayment_events(
931        &self,
932    ) -> Result<Vec<hypercall_db::PmSettlementRepaymentEventProjection>> {
933        let mut conn = self.get_conn().await?;
934        let rows = diesel_async::RunQueryDsl::load::<RepaymentEventRow>(
935            diesel::sql_query(
936                "SELECT * FROM pm_settlement_repayment_events ORDER BY created_at DESC, request_id LIMIT 1000",
937            ),
938            &mut conn,
939        )
940        .await?;
941        rows.into_iter().map(TryInto::try_into).collect()
942    }
943
944    async fn list_pm_recovery_plans(&self) -> Result<Vec<hypercall_db::PmRecoveryPlanProjection>> {
945        let mut conn = self.get_conn().await?;
946        let rows = diesel_async::RunQueryDsl::load::<RecoveryPlanRow>(
947            diesel::sql_query(
948                "SELECT * FROM pm_recovery_plans ORDER BY created_at DESC, plan_id LIMIT 1000",
949            ),
950            &mut conn,
951        )
952        .await?;
953        rows.into_iter().map(TryInto::try_into).collect()
954    }
955
956    async fn list_pm_recovery_actions(
957        &self,
958    ) -> Result<Vec<hypercall_db::PmRecoveryActionProjection>> {
959        let mut conn = self.get_conn().await?;
960        let rows = diesel_async::RunQueryDsl::load::<RecoveryActionRow>(
961            diesel::sql_query(
962                "SELECT * FROM pm_recovery_actions ORDER BY created_at DESC, plan_id, action_id LIMIT 1000",
963            ),
964            &mut conn,
965        )
966        .await?;
967        Ok(rows.into_iter().map(Into::into).collect())
968    }
969
970    async fn pm_recovery_action_gate_counts(
971        &self,
972    ) -> Result<hypercall_db::PmRecoveryActionGateCounts> {
973        let mut conn = self.get_conn().await?;
974        let row = diesel_async::RunQueryDsl::get_result::<RecoveryActionGateCountsRow>(
975            diesel::sql_query(
976                "SELECT
977                    COUNT(*)::bigint AS total_actions,
978                    COUNT(*) FILTER (WHERE status = 'Planned')::bigint AS planned_actions,
979                    COUNT(*) FILTER (WHERE status = 'Submitted')::bigint AS submitted_actions,
980                    COUNT(*) FILTER (WHERE status IN ('Confirmed', 'Failed', 'Canceled', 'TimedOut'))::bigint AS terminal_actions
981                 FROM pm_recovery_actions",
982            ),
983            &mut conn,
984        )
985        .await?;
986        Ok(row.into())
987    }
988}
989
990impl hypercall_db::PmSettlementProjectionSyncReader for DatabaseHandler {
991    fn list_pm_settlement_pools_sync(
992        &self,
993    ) -> Result<Vec<hypercall_db::PmSettlementPoolProjection>> {
994        let mut conn = self.pool().get()?;
995        let rows =
996            diesel::sql_query("SELECT * FROM pm_settlement_pools ORDER BY underlying LIMIT 1000")
997                .load::<PoolRow>(&mut conn)?;
998        Ok(rows.into_iter().map(Into::into).collect())
999    }
1000
1001    fn list_pm_settlement_accounts_sync(
1002        &self,
1003    ) -> Result<Vec<hypercall_db::PmSettlementAccountProjection>> {
1004        let mut conn = self.pool().get()?;
1005        let rows = diesel::sql_query(
1006            "SELECT * FROM pm_settlement_accounts ORDER BY underlying, wallet LIMIT 1000",
1007        )
1008        .load::<AccountRow>(&mut conn)?;
1009        rows.into_iter().map(TryInto::try_into).collect()
1010    }
1011
1012    fn list_pm_settlement_events_sync(
1013        &self,
1014    ) -> Result<Vec<hypercall_db::PmSettlementEventProjection>> {
1015        let mut conn = self.pool().get()?;
1016        let rows = diesel::sql_query(
1017            "SELECT * FROM pm_settlement_events ORDER BY created_at DESC, event_key LIMIT 1000",
1018        )
1019        .load::<EventRow>(&mut conn)?;
1020        rows.into_iter().map(TryInto::try_into).collect()
1021    }
1022
1023    fn list_pm_settlement_interest_events_sync(
1024        &self,
1025    ) -> Result<Vec<hypercall_db::PmSettlementInterestEventProjection>> {
1026        let mut conn = self.pool().get()?;
1027        let rows = diesel::sql_query(
1028            "SELECT * FROM pm_settlement_interest_events ORDER BY created_at DESC, request_id LIMIT 1000",
1029        )
1030        .load::<InterestEventRow>(&mut conn)?;
1031        rows.into_iter().map(TryInto::try_into).collect()
1032    }
1033
1034    fn list_pm_settlement_repayment_events_sync(
1035        &self,
1036    ) -> Result<Vec<hypercall_db::PmSettlementRepaymentEventProjection>> {
1037        let mut conn = self.pool().get()?;
1038        let rows = diesel::sql_query(
1039            "SELECT * FROM pm_settlement_repayment_events ORDER BY created_at DESC, request_id LIMIT 1000",
1040        )
1041        .load::<RepaymentEventRow>(&mut conn)?;
1042        rows.into_iter().map(TryInto::try_into).collect()
1043    }
1044
1045    fn list_pm_recovery_plans_sync(&self) -> Result<Vec<hypercall_db::PmRecoveryPlanProjection>> {
1046        let mut conn = self.pool().get()?;
1047        let rows = diesel::sql_query(
1048            "SELECT * FROM pm_recovery_plans ORDER BY created_at DESC, plan_id LIMIT 1000",
1049        )
1050        .load::<RecoveryPlanRow>(&mut conn)?;
1051        rows.into_iter().map(TryInto::try_into).collect()
1052    }
1053
1054    fn list_pm_recovery_actions_sync(
1055        &self,
1056    ) -> Result<Vec<hypercall_db::PmRecoveryActionProjection>> {
1057        let mut conn = self.pool().get()?;
1058        let rows = diesel::sql_query(
1059            "SELECT * FROM pm_recovery_actions ORDER BY created_at DESC, plan_id, action_id LIMIT 1000",
1060        )
1061        .load::<RecoveryActionRow>(&mut conn)?;
1062        Ok(rows.into_iter().map(Into::into).collect())
1063    }
1064}
1065
1066impl hypercall_db::PmSettlementProjectionSyncWriter for DatabaseHandler {
1067    fn apply_pm_settlement_projection_writes_sync(
1068        &self,
1069        writes: &[hypercall_db::PmSettlementProjectionWrite],
1070    ) -> Result<()> {
1071        if writes.is_empty() {
1072            return Ok(());
1073        }
1074        let mut conn = self.pool().get()?;
1075        conn.transaction::<_, diesel::result::Error, _>(|conn| {
1076            for write in writes {
1077                apply_pm_settlement_projection_write(conn, write)?;
1078            }
1079            Ok(())
1080        })?;
1081        Ok(())
1082    }
1083}
1084
1085#[cfg(test)]
1086mod tests {
1087    use diesel::RunQueryDsl;
1088    use hypercall_db::{
1089        PmSettlementProjectionReader, PmSettlementProjectionSyncReader,
1090        PmSettlementProjectionSyncWriter,
1091    };
1092    use rust_decimal_macros::dec;
1093
1094    use super::*;
1095    use crate::test_helpers::TestDb;
1096
1097    #[tokio::test]
1098    async fn pm_settlement_projection_reader_lists_phase2_rows() {
1099        let test_db = TestDb::new().await.unwrap();
1100        let db = test_db.handler.as_ref();
1101        let wallet = hypercall_types::wallet_address::test_wallet(168);
1102        let event_request_id = uuid::Uuid::new_v4();
1103        let interest_request_id = uuid::Uuid::new_v4();
1104        let repayment_request_id = uuid::Uuid::new_v4();
1105        let mut conn = db.pool().get().unwrap();
1106
1107        diesel::sql_query(
1108            "INSERT INTO pm_settlement_pools (
1109                underlying, config_version, policy_version, pool_available_usdc,
1110                pool_target_usdc, pool_capacity_usdc, pool_utilization,
1111                active_timing_bridge_usdc, active_settlement_debt_usdc,
1112                target_short_oi_notional_multiplier, utilization_kink, apr_at_kink,
1113                max_apr, normal_utilization_cap, crisis_utilization_cap,
1114                bridge_window_ms, last_engine_command_id, projection_seq
1115             ) VALUES (
1116                'ETH', 1, 2, $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, 3600000, 11, 12
1117             )",
1118        )
1119        .bind::<Numeric, _>(dec!(1000))
1120        .bind::<Numeric, _>(dec!(1500))
1121        .bind::<Numeric, _>(dec!(1200))
1122        .bind::<Numeric, _>(dec!(0.25))
1123        .bind::<Numeric, _>(dec!(100))
1124        .bind::<Numeric, _>(dec!(50))
1125        .bind::<Numeric, _>(dec!(0.20))
1126        .bind::<Numeric, _>(dec!(0.60))
1127        .bind::<Numeric, _>(dec!(0.04))
1128        .bind::<Numeric, _>(dec!(4.00))
1129        .bind::<Numeric, _>(dec!(0.80))
1130        .bind::<Numeric, _>(dec!(0.95))
1131        .execute(&mut conn)
1132        .unwrap();
1133
1134        diesel::sql_query(
1135            "INSERT INTO pm_settlement_accounts (
1136                wallet, underlying, status, timing_bridge_principal_usdc,
1137                settlement_debt_principal_usdc, accrued_interest_usdc,
1138                interest_cursor_ms, bridge_deadline_ms, active_recovery_plan_id,
1139                policy_version, last_engine_command_id, projection_seq
1140             ) VALUES ($1, 'ETH', 'Bridged', $2, $3, $4, 100, 200, 'plan-1', 2, 11, 12)",
1141        )
1142        .bind::<Bytea, _>(wallet.as_bytes())
1143        .bind::<Numeric, _>(dec!(75))
1144        .bind::<Numeric, _>(dec!(0))
1145        .bind::<Numeric, _>(dec!(3))
1146        .execute(&mut conn)
1147        .unwrap();
1148
1149        diesel::sql_query(
1150            "INSERT INTO pm_settlement_events (
1151                event_key, wallet, underlying, event_type, amount_usdc,
1152                request_id, input_digest, engine_command_id, projection_seq
1153             ) VALUES ('event-1', $1, 'ETH', 'TimingBridge', $2, $3, 'digest-1', 11, 12)",
1154        )
1155        .bind::<Bytea, _>(wallet.as_bytes())
1156        .bind::<Numeric, _>(dec!(75))
1157        .bind::<SqlUuid, _>(DbUuid(event_request_id))
1158        .execute(&mut conn)
1159        .unwrap();
1160
1161        diesel::sql_query(
1162            "INSERT INTO pm_settlement_interest_events (
1163                request_id, wallet, underlying, from_ms, to_ms, utilization, apr,
1164                interest_usdc, policy_version, engine_command_id, projection_seq
1165             ) VALUES ($1, $2, 'ETH', 100, 200, $3, $4, $5, 2, 11, 12)",
1166        )
1167        .bind::<SqlUuid, _>(DbUuid(interest_request_id))
1168        .bind::<Bytea, _>(wallet.as_bytes())
1169        .bind::<Numeric, _>(dec!(0.25))
1170        .bind::<Numeric, _>(dec!(0.04))
1171        .bind::<Numeric, _>(dec!(3))
1172        .execute(&mut conn)
1173        .unwrap();
1174
1175        diesel::sql_query(
1176            "INSERT INTO pm_settlement_repayment_events (
1177                request_id, wallet, underlying, amount_usdc, interest_paid_usdc,
1178                principal_paid_usdc, reason, source_event_id, engine_command_id, projection_seq
1179             ) VALUES ($1, $2, 'ETH', $3, $4, $5, 'manual', 'event-1', 11, 12)",
1180        )
1181        .bind::<SqlUuid, _>(DbUuid(repayment_request_id))
1182        .bind::<Bytea, _>(wallet.as_bytes())
1183        .bind::<Numeric, _>(dec!(10))
1184        .bind::<Numeric, _>(dec!(3))
1185        .bind::<Numeric, _>(dec!(7))
1186        .execute(&mut conn)
1187        .unwrap();
1188
1189        diesel::sql_query(
1190            "INSERT INTO pm_recovery_plans (
1191                plan_id, wallet, underlying, status, placeholder, engine_command_id, projection_seq
1192             ) VALUES ('plan-1', $1, 'ETH', 'Placeholder', true, 11, 12)",
1193        )
1194        .bind::<Bytea, _>(wallet.as_bytes())
1195        .execute(&mut conn)
1196        .unwrap();
1197
1198        diesel::sql_query(
1199            "INSERT INTO pm_recovery_actions (
1200                plan_id, action_id, action_type, status, engine_command_id, projection_seq
1201             ) VALUES ('plan-1', 'action-1', 'Placeholder', 'Pending', 11, 12)",
1202        )
1203        .execute(&mut conn)
1204        .unwrap();
1205
1206        let pools = db.list_pm_settlement_pools_sync().unwrap();
1207        assert_eq!(pools.len(), 1);
1208        assert_eq!(pools[0].pool_available_usdc, dec!(1000));
1209        assert_eq!(pools[0].pool_capacity_usdc, dec!(1200));
1210        let accounts = db.list_pm_settlement_accounts_sync().unwrap();
1211        assert_eq!(accounts[0].wallet, wallet);
1212        assert_eq!(accounts[0].timing_bridge_principal_usdc, dec!(75));
1213        assert_eq!(accounts[0].bridge_deadline_ms, Some(200));
1214        assert_eq!(
1215            accounts[0].active_recovery_plan_id.as_deref(),
1216            Some("plan-1")
1217        );
1218        assert_eq!(db.list_pm_settlement_events_sync().unwrap().len(), 1);
1219        assert_eq!(
1220            db.list_pm_settlement_interest_events_sync().unwrap()[0].request_id,
1221            interest_request_id
1222        );
1223        assert_eq!(
1224            db.list_pm_settlement_interest_events_sync().unwrap()[0].interest_usdc,
1225            dec!(3)
1226        );
1227        assert_eq!(
1228            db.list_pm_settlement_repayment_events_sync().unwrap()[0].request_id,
1229            repayment_request_id
1230        );
1231        assert_eq!(
1232            db.list_pm_settlement_repayment_events_sync().unwrap()[0].principal_paid_usdc,
1233            dec!(7)
1234        );
1235        assert_eq!(db.list_pm_recovery_plans_sync().unwrap().len(), 1);
1236        assert_eq!(db.list_pm_recovery_actions_sync().unwrap().len(), 1);
1237
1238        let async_db = test_db.diesel_db().await;
1239        assert_eq!(async_db.list_pm_settlement_pools().await.unwrap().len(), 1);
1240        assert_eq!(
1241            async_db.list_pm_settlement_accounts().await.unwrap()[0].wallet,
1242            wallet
1243        );
1244        assert_eq!(async_db.list_pm_settlement_events().await.unwrap().len(), 1);
1245        assert_eq!(
1246            async_db.list_pm_settlement_interest_events().await.unwrap()[0].interest_usdc,
1247            dec!(3)
1248        );
1249        assert_eq!(
1250            async_db
1251                .list_pm_settlement_repayment_events()
1252                .await
1253                .unwrap()[0]
1254                .principal_paid_usdc,
1255            dec!(7)
1256        );
1257        assert_eq!(async_db.list_pm_recovery_plans().await.unwrap().len(), 1);
1258        assert_eq!(async_db.list_pm_recovery_actions().await.unwrap().len(), 1);
1259    }
1260
1261    #[tokio::test]
1262    async fn pm_settlement_gate_counts_are_not_display_list_limited() {
1263        let test_db = TestDb::new().await.unwrap();
1264        let db = test_db.handler.as_ref();
1265        let mut conn = db.pool().get().unwrap();
1266
1267        diesel::sql_query(
1268            "INSERT INTO pm_settlement_pools (
1269                underlying, config_version, policy_version, pool_available_usdc,
1270                pool_target_usdc, pool_capacity_usdc, pool_utilization,
1271                active_timing_bridge_usdc, active_settlement_debt_usdc,
1272                normal_utilization_cap, crisis_utilization_cap,
1273                last_engine_command_id, projection_seq
1274             )
1275             SELECT
1276                'POOL' || gs::text,
1277                1,
1278                1,
1279                1,
1280                2,
1281                10,
1282                0.96,
1283                0,
1284                0,
1285                0.80,
1286                0.95,
1287                gs,
1288                gs
1289             FROM generate_series(1, 1001) AS gs",
1290        )
1291        .execute(&mut conn)
1292        .unwrap();
1293
1294        diesel::sql_query(
1295            "INSERT INTO pm_settlement_accounts (
1296                wallet, underlying, status, timing_bridge_principal_usdc,
1297                settlement_debt_principal_usdc, accrued_interest_usdc,
1298                interest_cursor_ms, bridge_deadline_ms, active_recovery_plan_id,
1299                policy_version, last_engine_command_id, projection_seq
1300             )
1301             SELECT
1302                decode(lpad(to_hex(gs), 40, '0'), 'hex'),
1303                'ETH',
1304                'Debt',
1305                1,
1306                2,
1307                0,
1308                100,
1309                200,
1310                'plan-' || gs::text,
1311                1,
1312                gs,
1313                gs
1314             FROM generate_series(1, 1001) AS gs",
1315        )
1316        .execute(&mut conn)
1317        .unwrap();
1318
1319        diesel::sql_query(
1320            "INSERT INTO pm_recovery_plans (
1321                plan_id, wallet, underlying, status, placeholder, engine_command_id, projection_seq
1322             )
1323             SELECT
1324                'plan-' || gs::text,
1325                decode(lpad(to_hex(gs), 40, '0'), 'hex'),
1326                'ETH',
1327                'Planned',
1328                false,
1329                gs,
1330                gs
1331             FROM generate_series(1, 1001) AS gs",
1332        )
1333        .execute(&mut conn)
1334        .unwrap();
1335
1336        diesel::sql_query(
1337            "INSERT INTO pm_recovery_actions (
1338                plan_id, action_id, action_type, status, engine_command_id, projection_seq
1339             )
1340             SELECT
1341                'plan-' || gs::text,
1342                'action-' || gs::text,
1343                'Transfer',
1344                'Submitted',
1345                gs,
1346                gs
1347             FROM generate_series(1, 1001) AS gs",
1348        )
1349        .execute(&mut conn)
1350        .unwrap();
1351
1352        let async_db = test_db.diesel_db().await;
1353        assert_eq!(
1354            async_db.list_pm_settlement_pools().await.unwrap().len(),
1355            1000
1356        );
1357        assert_eq!(
1358            async_db.list_pm_settlement_accounts().await.unwrap().len(),
1359            1000
1360        );
1361        assert_eq!(
1362            async_db.list_pm_recovery_actions().await.unwrap().len(),
1363            1000
1364        );
1365
1366        let pool_counts = async_db.pm_settlement_pool_gate_counts().await.unwrap();
1367        assert_eq!(pool_counts.total_pools, 1001);
1368        assert_eq!(pool_counts.below_target_pools, 1001);
1369        assert_eq!(pool_counts.above_crisis_cap_pools, 1001);
1370        assert_eq!(pool_counts.missing_utilization_pools, 0);
1371        assert_eq!(pool_counts.missing_crisis_cap_pools, 0);
1372
1373        let account_counts = async_db
1374            .pm_settlement_account_gate_counts(300)
1375            .await
1376            .unwrap();
1377        assert_eq!(account_counts.total_accounts, 1001);
1378        assert_eq!(account_counts.bridged_accounts, 1001);
1379        assert_eq!(account_counts.debt_accounts, 1001);
1380        assert_eq!(account_counts.overdue_bridge_accounts, 1001);
1381        assert_eq!(account_counts.active_recovery_accounts, 1001);
1382        assert_eq!(account_counts.active_bridge_usdc, dec!(1001));
1383        assert_eq!(account_counts.active_debt_usdc, dec!(2002));
1384
1385        let action_counts = async_db.pm_recovery_action_gate_counts().await.unwrap();
1386        assert_eq!(action_counts.total_actions, 1001);
1387        assert_eq!(action_counts.submitted_actions, 1001);
1388    }
1389
1390    #[tokio::test]
1391    async fn pm_settlement_gate_counts_fail_closed_for_unknown_bridge_deadlines() {
1392        let test_db = TestDb::new().await.unwrap();
1393        let db = test_db.handler.as_ref();
1394        let mut conn = db.pool().get().unwrap();
1395        let wallet = hypercall_types::wallet_address::test_wallet(170);
1396
1397        diesel::sql_query(
1398            "INSERT INTO pm_settlement_accounts (
1399                wallet, underlying, status, timing_bridge_principal_usdc,
1400                settlement_debt_principal_usdc, accrued_interest_usdc,
1401                interest_cursor_ms, bridge_deadline_ms, active_recovery_plan_id,
1402                policy_version, last_engine_command_id, projection_seq
1403             ) VALUES ($1, 'ETH', 'Bridged', $2, 0, 0, 100, NULL, NULL, 1, 1, 1)",
1404        )
1405        .bind::<Bytea, _>(wallet.as_bytes())
1406        .bind::<Numeric, _>(dec!(10))
1407        .execute(&mut conn)
1408        .unwrap();
1409
1410        let async_db = test_db.diesel_db().await;
1411        let account_counts = async_db
1412            .pm_settlement_account_gate_counts(300)
1413            .await
1414            .unwrap();
1415        assert_eq!(account_counts.bridged_accounts, 1);
1416        assert_eq!(account_counts.overdue_bridge_accounts, 1);
1417    }
1418
1419    #[tokio::test]
1420    async fn pm_settlement_gate_counts_ignore_repaid_recovery_markers() {
1421        let test_db = TestDb::new().await.unwrap();
1422        let db = test_db.handler.as_ref();
1423        let mut conn = db.pool().get().unwrap();
1424        let active_wallet = hypercall_types::wallet_address::test_wallet(171);
1425        let repaid_wallet = hypercall_types::wallet_address::test_wallet(172);
1426
1427        diesel::sql_query(
1428            "INSERT INTO pm_settlement_accounts (
1429                wallet, underlying, status, timing_bridge_principal_usdc,
1430                settlement_debt_principal_usdc, accrued_interest_usdc,
1431                interest_cursor_ms, bridge_deadline_ms, active_recovery_plan_id,
1432                policy_version, last_engine_command_id, projection_seq
1433             ) VALUES
1434                ($1, 'ETH', 'Debt', 0, $2, 0, 100, NULL, 'active-plan', 1, 1, 1),
1435                ($3, 'ETH', 'Current', 0, 0, 0, 100, NULL, 'repaid-plan', 1, 2, 2)",
1436        )
1437        .bind::<Bytea, _>(active_wallet.as_bytes())
1438        .bind::<Numeric, _>(dec!(10))
1439        .bind::<Bytea, _>(repaid_wallet.as_bytes())
1440        .execute(&mut conn)
1441        .unwrap();
1442
1443        let async_db = test_db.diesel_db().await;
1444        let account_counts = async_db
1445            .pm_settlement_account_gate_counts(300)
1446            .await
1447            .unwrap();
1448        assert_eq!(account_counts.total_accounts, 2);
1449        assert_eq!(account_counts.debt_accounts, 1);
1450        assert_eq!(account_counts.active_recovery_accounts, 1);
1451    }
1452
1453    #[tokio::test]
1454    async fn pm_settlement_projection_writer_is_idempotent_and_transactional() {
1455        let test_db = TestDb::new().await.unwrap();
1456        let db = test_db.handler.as_ref();
1457        let wallet = hypercall_types::wallet_address::test_wallet(169);
1458        let interest_request_id = uuid::Uuid::new_v4();
1459
1460        let pool_write = hypercall_db::PmSettlementProjectionWrite::Pool(
1461            hypercall_db::PmSettlementPoolProjectionWrite {
1462                underlying: "ETH".to_string(),
1463                config_version: 1,
1464                policy_version: 1,
1465                pool_available_usdc: dec!(1000),
1466                pool_target_usdc: dec!(1500),
1467                pool_capacity_usdc: dec!(1100),
1468                pool_utilization: Some(dec!(0.10)),
1469                active_timing_bridge_usdc: dec!(100),
1470                active_settlement_debt_usdc: dec!(0),
1471                target_short_oi_notional_multiplier: Some(dec!(0.20)),
1472                utilization_kink: Some(dec!(0.60)),
1473                apr_at_kink: Some(dec!(0.04)),
1474                max_apr: Some(dec!(4.00)),
1475                normal_utilization_cap: Some(dec!(0.80)),
1476                crisis_utilization_cap: Some(dec!(0.95)),
1477                bridge_window_ms: Some(3_600_000),
1478                projection_seq: 100,
1479                updated_at_ms: 100,
1480            },
1481        );
1482        let account_write = hypercall_db::PmSettlementProjectionWrite::Account(
1483            hypercall_db::PmSettlementAccountProjectionWrite {
1484                wallet,
1485                underlying: "ETH".to_string(),
1486                status: "Bridged".to_string(),
1487                timing_bridge_principal_usdc: dec!(50),
1488                settlement_debt_principal_usdc: dec!(0),
1489                accrued_interest_usdc: dec!(2),
1490                interest_cursor_ms: 100,
1491                bridge_deadline_ms: Some(200),
1492                active_recovery_plan_id: Some("plan-169".to_string()),
1493                policy_version: 1,
1494                projection_seq: 100,
1495                updated_at_ms: 100,
1496            },
1497        );
1498        let interest_write = hypercall_db::PmSettlementProjectionWrite::InterestEvent(
1499            hypercall_db::PmSettlementInterestEventProjectionWrite {
1500                request_id: interest_request_id,
1501                wallet,
1502                underlying: "ETH".to_string(),
1503                from_ms: 100,
1504                to_ms: 200,
1505                utilization: dec!(0.10),
1506                apr: dec!(0.04),
1507                interest_usdc: dec!(2),
1508                policy_version: 1,
1509            },
1510        );
1511        let recovery_write = hypercall_db::PmSettlementProjectionWrite::RecoveryPlan(
1512            hypercall_db::PmRecoveryPlanProjectionWrite {
1513                plan_id: "plan-169".to_string(),
1514                wallet,
1515                underlying: "ETH".to_string(),
1516                status: "Confirmed".to_string(),
1517                trigger: "UtilizationBreach".to_string(),
1518                reason: "PoolUtilizationBreach".to_string(),
1519                policy_version: 1,
1520                recovery_priority_version: 1,
1521                target_reduction_usdc: dec!(10),
1522                expected_usdc_recovered: dec!(10),
1523                expected_obligation_reduced: dec!(0),
1524                expected_impact_usdc: dec!(1),
1525                post_plan_utilization: Some(dec!(0.09)),
1526                projection_seq: 300,
1527                updated_at_ms: 300,
1528                actions: vec![hypercall_db::PmRecoveryActionProjectionWrite {
1529                    plan_id: "plan-169".to_string(),
1530                    action_id: "plan-169:0".to_string(),
1531                    action_type: "CancelOrder".to_string(),
1532                    status: "Confirmed".to_string(),
1533                    target: "order-1".to_string(),
1534                    attempt: 1,
1535                    external_id: Some("directive-1".to_string()),
1536                    external_kind: Some("Directive".to_string()),
1537                    result_external_id: Some("fill-1".to_string()),
1538                    result: Some("Confirmed".to_string()),
1539                    expected_usdc_recovered: dec!(10),
1540                    expected_obligation_reduced: dec!(0),
1541                    expected_impact_usdc: dec!(1),
1542                    recovered_usdc: dec!(10),
1543                    liability_reduction_usdc: dec!(0),
1544                    projection_seq: 300,
1545                    updated_at_ms: 300,
1546                }],
1547            },
1548        );
1549        let deposit_id = uuid::Uuid::new_v4();
1550        let withdrawal_id = uuid::Uuid::new_v4();
1551        let vault_deposit_write = hypercall_db::PmSettlementProjectionWrite::VaultDeposit(
1552            hypercall_db::PmVaultDepositProjectionWrite {
1553                deposit_id,
1554                depositor: wallet,
1555                underlying: "ETH".to_string(),
1556                principal_usdc: dec!(500),
1557                remaining_usdc: dec!(400),
1558                withdrawn_usdc: dec!(0),
1559                reserved_withdrawal_usdc: dec!(100),
1560                chain_id: 42161,
1561                source_contract_address: wallet,
1562                tx_hash: "0xdeposit".to_string(),
1563                log_index: 7,
1564                max_listed_expiry_ts_ms: 1_800_000_000_000,
1565                settlement_grace_ms: 86_400_000,
1566                lock_until_ms: 1_800_086_400_000,
1567                status: hypercall_db::PmVaultDepositProjectionStatus::PartiallyReserved,
1568                projection_seq: 400,
1569                created_at_ms: 300,
1570                updated_at_ms: 400,
1571            },
1572        );
1573        let vault_withdrawal_write = hypercall_db::PmSettlementProjectionWrite::VaultWithdrawal(
1574            hypercall_db::PmVaultWithdrawalProjectionWrite {
1575                withdrawal_id,
1576                deposit_id,
1577                depositor: wallet,
1578                underlying: "ETH".to_string(),
1579                amount_usdc: dec!(100),
1580                lock_until_ms: 1_800_086_400_000,
1581                status: hypercall_db::PmVaultWithdrawalProjectionStatus::Reserved,
1582                projection_seq: 400,
1583                requested_at_ms: 1_800_086_400_000,
1584                updated_at_ms: 1_800_086_400_000,
1585            },
1586        );
1587
1588        db.apply_pm_settlement_projection_writes_sync(&[
1589            pool_write.clone(),
1590            account_write.clone(),
1591            interest_write.clone(),
1592            recovery_write.clone(),
1593            vault_deposit_write.clone(),
1594            vault_withdrawal_write.clone(),
1595        ])
1596        .unwrap();
1597        db.apply_pm_settlement_projection_writes_sync(&[
1598            pool_write.clone(),
1599            account_write.clone(),
1600            interest_write,
1601            recovery_write,
1602            vault_deposit_write,
1603            vault_withdrawal_write,
1604        ])
1605        .unwrap();
1606
1607        assert_eq!(db.list_pm_settlement_pools_sync().unwrap().len(), 1);
1608        let accounts = db.list_pm_settlement_accounts_sync().unwrap();
1609        assert_eq!(accounts.len(), 1);
1610        assert_eq!(accounts[0].bridge_deadline_ms, Some(200));
1611        assert_eq!(
1612            accounts[0].active_recovery_plan_id.as_deref(),
1613            Some("plan-169")
1614        );
1615
1616        let stale_account_write = hypercall_db::PmSettlementProjectionWrite::Account(
1617            hypercall_db::PmSettlementAccountProjectionWrite {
1618                wallet,
1619                underlying: "ETH".to_string(),
1620                status: "Debt".to_string(),
1621                timing_bridge_principal_usdc: dec!(0),
1622                settlement_debt_principal_usdc: dec!(999),
1623                accrued_interest_usdc: dec!(999),
1624                interest_cursor_ms: 1,
1625                bridge_deadline_ms: None,
1626                active_recovery_plan_id: None,
1627                policy_version: 1,
1628                projection_seq: 99,
1629                updated_at_ms: 99,
1630            },
1631        );
1632        db.apply_pm_settlement_projection_writes_sync(&[stale_account_write])
1633            .unwrap();
1634        let accounts = db.list_pm_settlement_accounts_sync().unwrap();
1635        assert_eq!(accounts[0].status, "Bridged");
1636        assert_eq!(accounts[0].timing_bridge_principal_usdc, dec!(50));
1637        assert_eq!(accounts[0].settlement_debt_principal_usdc, dec!(0));
1638        assert_eq!(accounts[0].accrued_interest_usdc, dec!(2));
1639        assert_eq!(accounts[0].interest_cursor_ms, 100);
1640        assert_eq!(accounts[0].bridge_deadline_ms, Some(200));
1641        assert_eq!(
1642            accounts[0].active_recovery_plan_id.as_deref(),
1643            Some("plan-169")
1644        );
1645        assert_eq!(accounts[0].projection_seq, 100);
1646
1647        let stale_pool_write = hypercall_db::PmSettlementProjectionWrite::Pool(
1648            hypercall_db::PmSettlementPoolProjectionWrite {
1649                underlying: "ETH".to_string(),
1650                config_version: 1,
1651                policy_version: 1,
1652                pool_available_usdc: dec!(999),
1653                pool_target_usdc: dec!(999),
1654                pool_capacity_usdc: dec!(999),
1655                pool_utilization: Some(dec!(0.99)),
1656                active_timing_bridge_usdc: dec!(999),
1657                active_settlement_debt_usdc: dec!(999),
1658                target_short_oi_notional_multiplier: Some(dec!(0.99)),
1659                utilization_kink: Some(dec!(0.99)),
1660                apr_at_kink: Some(dec!(0.99)),
1661                max_apr: Some(dec!(9.99)),
1662                normal_utilization_cap: Some(dec!(0.99)),
1663                crisis_utilization_cap: Some(dec!(0.99)),
1664                bridge_window_ms: Some(99),
1665                projection_seq: 99,
1666                updated_at_ms: 99,
1667            },
1668        );
1669        db.apply_pm_settlement_projection_writes_sync(&[stale_pool_write])
1670            .unwrap();
1671        let pools = db.list_pm_settlement_pools_sync().unwrap();
1672        assert_eq!(pools[0].pool_available_usdc, dec!(1000));
1673        assert_eq!(pools[0].pool_target_usdc, dec!(1500));
1674        assert_eq!(pools[0].pool_capacity_usdc, dec!(1100));
1675        assert_eq!(pools[0].pool_utilization, Some(dec!(0.10)));
1676        assert_eq!(pools[0].active_timing_bridge_usdc, dec!(100));
1677        assert_eq!(pools[0].projection_seq, 100);
1678
1679        let interest_events = db.list_pm_settlement_interest_events_sync().unwrap();
1680        assert_eq!(interest_events.len(), 1);
1681        assert_eq!(interest_events[0].request_id, interest_request_id);
1682        let recovery_actions = db.list_pm_recovery_actions_sync().unwrap();
1683        assert_eq!(recovery_actions.len(), 1);
1684        assert_eq!(
1685            recovery_actions[0].result_external_id.as_deref(),
1686            Some("fill-1")
1687        );
1688        #[derive(QueryableByName)]
1689        struct VaultProjectionCounts {
1690            #[diesel(sql_type = BigInt)]
1691            deposits: i64,
1692            #[diesel(sql_type = BigInt)]
1693            withdrawals: i64,
1694        }
1695        #[derive(QueryableByName)]
1696        struct VaultDepositProjectionRow {
1697            #[diesel(sql_type = Numeric)]
1698            remaining_usdc: rust_decimal::Decimal,
1699            #[diesel(sql_type = Numeric)]
1700            withdrawn_usdc: rust_decimal::Decimal,
1701            #[diesel(sql_type = Numeric)]
1702            reserved_withdrawal_usdc: rust_decimal::Decimal,
1703            #[diesel(sql_type = Text)]
1704            status: String,
1705            #[diesel(sql_type = BigInt)]
1706            projection_seq: i64,
1707        }
1708        let mut conn = db.pool().get().unwrap();
1709        let vault_counts: VaultProjectionCounts = diesel::sql_query(
1710            "SELECT
1711                (SELECT COUNT(*) FROM pm_vault_deposits)::bigint AS deposits,
1712                (SELECT COUNT(*) FROM pm_vault_withdrawals)::bigint AS withdrawals",
1713        )
1714        .get_result(&mut conn)
1715        .unwrap();
1716        assert_eq!(vault_counts.deposits, 1);
1717        assert_eq!(vault_counts.withdrawals, 1);
1718
1719        let stale_vault_deposit_write = hypercall_db::PmSettlementProjectionWrite::VaultDeposit(
1720            hypercall_db::PmVaultDepositProjectionWrite {
1721                deposit_id,
1722                depositor: wallet,
1723                underlying: "ETH".to_string(),
1724                principal_usdc: dec!(500),
1725                remaining_usdc: dec!(0),
1726                withdrawn_usdc: dec!(500),
1727                reserved_withdrawal_usdc: dec!(0),
1728                chain_id: 42161,
1729                source_contract_address: wallet,
1730                tx_hash: "0xdeposit".to_string(),
1731                log_index: 7,
1732                max_listed_expiry_ts_ms: 1_800_000_000_000,
1733                settlement_grace_ms: 86_400_000,
1734                lock_until_ms: 1_800_086_400_000,
1735                status: hypercall_db::PmVaultDepositProjectionStatus::Reserved,
1736                projection_seq: 399,
1737                created_at_ms: 300,
1738                updated_at_ms: 399,
1739            },
1740        );
1741        db.apply_pm_settlement_projection_writes_sync(&[stale_vault_deposit_write])
1742            .unwrap();
1743        let stale_checked_deposit: VaultDepositProjectionRow = diesel::sql_query(
1744            "SELECT
1745                remaining_usdc,
1746                withdrawn_usdc,
1747                reserved_withdrawal_usdc,
1748                status::text AS status,
1749                projection_seq
1750             FROM pm_vault_deposits
1751             WHERE deposit_id = $1",
1752        )
1753        .bind::<SqlUuid, _>(DbUuid(deposit_id))
1754        .get_result(&mut conn)
1755        .unwrap();
1756        assert_eq!(stale_checked_deposit.remaining_usdc, dec!(400));
1757        assert_eq!(stale_checked_deposit.withdrawn_usdc, dec!(0));
1758        assert_eq!(stale_checked_deposit.reserved_withdrawal_usdc, dec!(100));
1759        assert_eq!(stale_checked_deposit.status, "PartiallyReserved");
1760        assert_eq!(stale_checked_deposit.projection_seq, 400);
1761
1762        let invalid_repayment = hypercall_db::PmSettlementProjectionWrite::RepaymentEvent(
1763            hypercall_db::PmSettlementRepaymentEventProjectionWrite {
1764                request_id: uuid::Uuid::new_v4(),
1765                wallet,
1766                underlying: "BTC".to_string(),
1767                amount_usdc: dec!(-1),
1768                interest_paid_usdc: dec!(0),
1769                principal_paid_usdc: dec!(0),
1770                reason: "manual".to_string(),
1771                source_event_id: "event-rollback".to_string(),
1772            },
1773        );
1774        let rollback_pool = hypercall_db::PmSettlementProjectionWrite::Pool(
1775            hypercall_db::PmSettlementPoolProjectionWrite {
1776                underlying: "BTC".to_string(),
1777                config_version: 1,
1778                policy_version: 1,
1779                pool_available_usdc: dec!(100),
1780                pool_target_usdc: dec!(100),
1781                pool_capacity_usdc: dec!(100),
1782                pool_utilization: Some(dec!(0)),
1783                active_timing_bridge_usdc: dec!(0),
1784                active_settlement_debt_usdc: dec!(0),
1785                target_short_oi_notional_multiplier: Some(dec!(0.20)),
1786                utilization_kink: Some(dec!(0.60)),
1787                apr_at_kink: Some(dec!(0.04)),
1788                max_apr: Some(dec!(4.00)),
1789                normal_utilization_cap: Some(dec!(0.80)),
1790                crisis_utilization_cap: Some(dec!(0.95)),
1791                bridge_window_ms: Some(3_600_000),
1792                projection_seq: 200,
1793                updated_at_ms: 200,
1794            },
1795        );
1796
1797        let result =
1798            db.apply_pm_settlement_projection_writes_sync(&[rollback_pool, invalid_repayment]);
1799        assert!(result.is_err());
1800        let pools = db.list_pm_settlement_pools_sync().unwrap();
1801        assert_eq!(pools.len(), 1);
1802        assert_eq!(pools[0].underlying, "ETH");
1803        assert!(db
1804            .list_pm_settlement_repayment_events_sync()
1805            .unwrap()
1806            .is_empty());
1807    }
1808}