1use anyhow::{anyhow, Result};
2use diesel::pg::PgConnection;
3use diesel::prelude::*;
4use diesel::sql_types::{
5 BigInt, Bool, Bytea, Integer, Nullable, Numeric, Text, Timestamptz, Uuid as SqlUuid,
6};
7
8use hypercall_types::WalletAddress;
9
10use crate::database_handler::DatabaseHandler;
11use crate::diesel_db::DieselDb;
12use crate::engine_enums::DbUuid;
13
14fn wallet_from_bytes(bytes: Vec<u8>) -> Result<WalletAddress> {
15 let raw: [u8; 20] = bytes
16 .try_into()
17 .map_err(|bytes: Vec<u8>| anyhow!("invalid wallet byte length {}", bytes.len()))?;
18 Ok(WalletAddress::from(raw))
19}
20
21#[derive(QueryableByName)]
22struct PoolRow {
23 #[diesel(sql_type = Text)]
24 underlying: String,
25 #[diesel(sql_type = Integer)]
26 config_version: i32,
27 #[diesel(sql_type = Integer)]
28 policy_version: i32,
29 #[diesel(sql_type = Numeric)]
30 pool_available_usdc: rust_decimal::Decimal,
31 #[diesel(sql_type = Numeric)]
32 pool_target_usdc: rust_decimal::Decimal,
33 #[diesel(sql_type = Numeric)]
34 pool_capacity_usdc: rust_decimal::Decimal,
35 #[diesel(sql_type = Nullable<Numeric>)]
36 pool_utilization: Option<rust_decimal::Decimal>,
37 #[diesel(sql_type = Numeric)]
38 active_timing_bridge_usdc: rust_decimal::Decimal,
39 #[diesel(sql_type = Numeric)]
40 active_settlement_debt_usdc: rust_decimal::Decimal,
41 #[diesel(sql_type = Nullable<Numeric>)]
42 target_short_oi_notional_multiplier: Option<rust_decimal::Decimal>,
43 #[diesel(sql_type = Nullable<Numeric>)]
44 utilization_kink: Option<rust_decimal::Decimal>,
45 #[diesel(sql_type = Nullable<Numeric>)]
46 apr_at_kink: Option<rust_decimal::Decimal>,
47 #[diesel(sql_type = Nullable<Numeric>)]
48 max_apr: Option<rust_decimal::Decimal>,
49 #[diesel(sql_type = Nullable<Numeric>)]
50 normal_utilization_cap: Option<rust_decimal::Decimal>,
51 #[diesel(sql_type = Nullable<Numeric>)]
52 crisis_utilization_cap: Option<rust_decimal::Decimal>,
53 #[diesel(sql_type = Nullable<BigInt>)]
54 bridge_window_ms: Option<i64>,
55 #[diesel(sql_type = BigInt)]
56 last_engine_command_id: i64,
57 #[diesel(sql_type = BigInt)]
58 projection_seq: i64,
59 #[diesel(sql_type = Timestamptz)]
60 updated_at: chrono::DateTime<chrono::Utc>,
61}
62
63impl From<PoolRow> for hypercall_db::PmSettlementPoolProjection {
64 fn from(row: PoolRow) -> Self {
65 Self {
66 underlying: row.underlying,
67 config_version: row.config_version,
68 policy_version: row.policy_version,
69 pool_available_usdc: row.pool_available_usdc,
70 pool_target_usdc: row.pool_target_usdc,
71 pool_capacity_usdc: row.pool_capacity_usdc,
72 pool_utilization: row.pool_utilization,
73 active_timing_bridge_usdc: row.active_timing_bridge_usdc,
74 active_settlement_debt_usdc: row.active_settlement_debt_usdc,
75 target_short_oi_notional_multiplier: row.target_short_oi_notional_multiplier,
76 utilization_kink: row.utilization_kink,
77 apr_at_kink: row.apr_at_kink,
78 max_apr: row.max_apr,
79 normal_utilization_cap: row.normal_utilization_cap,
80 crisis_utilization_cap: row.crisis_utilization_cap,
81 bridge_window_ms: row.bridge_window_ms,
82 last_engine_command_id: row.last_engine_command_id,
83 projection_seq: row.projection_seq,
84 updated_at: row.updated_at,
85 }
86 }
87}
88
89#[derive(QueryableByName)]
90struct PoolGateCountsRow {
91 #[diesel(sql_type = BigInt)]
92 total_pools: i64,
93 #[diesel(sql_type = BigInt)]
94 below_target_pools: i64,
95 #[diesel(sql_type = BigInt)]
96 above_crisis_cap_pools: i64,
97 #[diesel(sql_type = BigInt)]
98 missing_utilization_pools: i64,
99 #[diesel(sql_type = BigInt)]
100 missing_crisis_cap_pools: i64,
101}
102
103impl From<PoolGateCountsRow> for hypercall_db::PmSettlementPoolGateCounts {
104 fn from(row: PoolGateCountsRow) -> Self {
105 Self {
106 total_pools: row.total_pools,
107 below_target_pools: row.below_target_pools,
108 above_crisis_cap_pools: row.above_crisis_cap_pools,
109 missing_utilization_pools: row.missing_utilization_pools,
110 missing_crisis_cap_pools: row.missing_crisis_cap_pools,
111 }
112 }
113}
114
115#[derive(QueryableByName)]
116struct AccountRow {
117 #[diesel(sql_type = Bytea)]
118 wallet: Vec<u8>,
119 #[diesel(sql_type = Text)]
120 underlying: String,
121 #[diesel(sql_type = Text)]
122 status: String,
123 #[diesel(sql_type = Numeric)]
124 timing_bridge_principal_usdc: rust_decimal::Decimal,
125 #[diesel(sql_type = Numeric)]
126 settlement_debt_principal_usdc: rust_decimal::Decimal,
127 #[diesel(sql_type = Numeric)]
128 accrued_interest_usdc: rust_decimal::Decimal,
129 #[diesel(sql_type = BigInt)]
130 interest_cursor_ms: i64,
131 #[diesel(sql_type = Nullable<BigInt>)]
132 bridge_deadline_ms: Option<i64>,
133 #[diesel(sql_type = Nullable<Text>)]
134 active_recovery_plan_id: Option<String>,
135 #[diesel(sql_type = Integer)]
136 policy_version: i32,
137 #[diesel(sql_type = BigInt)]
138 last_engine_command_id: i64,
139 #[diesel(sql_type = BigInt)]
140 projection_seq: i64,
141 #[diesel(sql_type = Timestamptz)]
142 updated_at: chrono::DateTime<chrono::Utc>,
143}
144
145impl TryFrom<AccountRow> for hypercall_db::PmSettlementAccountProjection {
146 type Error = anyhow::Error;
147
148 fn try_from(row: AccountRow) -> Result<Self> {
149 Ok(Self {
150 wallet: wallet_from_bytes(row.wallet)?,
151 underlying: row.underlying,
152 status: row.status,
153 timing_bridge_principal_usdc: row.timing_bridge_principal_usdc,
154 settlement_debt_principal_usdc: row.settlement_debt_principal_usdc,
155 accrued_interest_usdc: row.accrued_interest_usdc,
156 interest_cursor_ms: row.interest_cursor_ms,
157 bridge_deadline_ms: row.bridge_deadline_ms,
158 active_recovery_plan_id: row.active_recovery_plan_id,
159 policy_version: row.policy_version,
160 last_engine_command_id: row.last_engine_command_id,
161 projection_seq: row.projection_seq,
162 updated_at: row.updated_at,
163 })
164 }
165}
166
167#[derive(QueryableByName)]
168struct AccountGateCountsRow {
169 #[diesel(sql_type = BigInt)]
170 total_accounts: i64,
171 #[diesel(sql_type = BigInt)]
172 bridged_accounts: i64,
173 #[diesel(sql_type = BigInt)]
174 debt_accounts: i64,
175 #[diesel(sql_type = BigInt)]
176 overdue_bridge_accounts: i64,
177 #[diesel(sql_type = BigInt)]
178 active_recovery_accounts: i64,
179 #[diesel(sql_type = Numeric)]
180 active_bridge_usdc: rust_decimal::Decimal,
181 #[diesel(sql_type = Numeric)]
182 active_debt_usdc: rust_decimal::Decimal,
183}
184
185impl From<AccountGateCountsRow> for hypercall_db::PmSettlementAccountGateCounts {
186 fn from(row: AccountGateCountsRow) -> Self {
187 Self {
188 total_accounts: row.total_accounts,
189 bridged_accounts: row.bridged_accounts,
190 debt_accounts: row.debt_accounts,
191 overdue_bridge_accounts: row.overdue_bridge_accounts,
192 active_recovery_accounts: row.active_recovery_accounts,
193 active_bridge_usdc: row.active_bridge_usdc,
194 active_debt_usdc: row.active_debt_usdc,
195 }
196 }
197}
198
199#[derive(QueryableByName)]
200struct EventRow {
201 #[diesel(sql_type = Text)]
202 event_key: String,
203 #[diesel(sql_type = Bytea)]
204 wallet: Vec<u8>,
205 #[diesel(sql_type = Text)]
206 underlying: String,
207 #[diesel(sql_type = Text)]
208 event_type: String,
209 #[diesel(sql_type = Numeric)]
210 amount_usdc: rust_decimal::Decimal,
211 #[diesel(sql_type = SqlUuid)]
212 request_id: DbUuid,
213 #[diesel(sql_type = Text)]
214 input_digest: String,
215 #[diesel(sql_type = BigInt)]
216 engine_command_id: i64,
217 #[diesel(sql_type = BigInt)]
218 projection_seq: i64,
219 #[diesel(sql_type = Timestamptz)]
220 created_at: chrono::DateTime<chrono::Utc>,
221}
222
223impl TryFrom<EventRow> for hypercall_db::PmSettlementEventProjection {
224 type Error = anyhow::Error;
225
226 fn try_from(row: EventRow) -> Result<Self> {
227 Ok(Self {
228 event_key: row.event_key,
229 wallet: wallet_from_bytes(row.wallet)?,
230 underlying: row.underlying,
231 event_type: row.event_type,
232 amount_usdc: row.amount_usdc,
233 request_id: row.request_id.into(),
234 input_digest: row.input_digest,
235 engine_command_id: row.engine_command_id,
236 projection_seq: row.projection_seq,
237 created_at: row.created_at,
238 })
239 }
240}
241
242#[derive(QueryableByName)]
243struct InterestEventRow {
244 #[diesel(sql_type = SqlUuid)]
245 request_id: DbUuid,
246 #[diesel(sql_type = Bytea)]
247 wallet: Vec<u8>,
248 #[diesel(sql_type = Text)]
249 underlying: String,
250 #[diesel(sql_type = BigInt)]
251 from_ms: i64,
252 #[diesel(sql_type = BigInt)]
253 to_ms: i64,
254 #[diesel(sql_type = Numeric)]
255 utilization: rust_decimal::Decimal,
256 #[diesel(sql_type = Numeric)]
257 apr: rust_decimal::Decimal,
258 #[diesel(sql_type = Numeric)]
259 interest_usdc: rust_decimal::Decimal,
260 #[diesel(sql_type = Integer)]
261 policy_version: i32,
262 #[diesel(sql_type = BigInt)]
263 engine_command_id: i64,
264 #[diesel(sql_type = BigInt)]
265 projection_seq: i64,
266 #[diesel(sql_type = Timestamptz)]
267 created_at: chrono::DateTime<chrono::Utc>,
268}
269
270impl TryFrom<InterestEventRow> for hypercall_db::PmSettlementInterestEventProjection {
271 type Error = anyhow::Error;
272
273 fn try_from(row: InterestEventRow) -> Result<Self> {
274 Ok(Self {
275 request_id: row.request_id.into(),
276 wallet: wallet_from_bytes(row.wallet)?,
277 underlying: row.underlying,
278 from_ms: row.from_ms,
279 to_ms: row.to_ms,
280 utilization: row.utilization,
281 apr: row.apr,
282 interest_usdc: row.interest_usdc,
283 policy_version: row.policy_version,
284 engine_command_id: row.engine_command_id,
285 projection_seq: row.projection_seq,
286 created_at: row.created_at,
287 })
288 }
289}
290
291#[derive(QueryableByName)]
292struct RepaymentEventRow {
293 #[diesel(sql_type = SqlUuid)]
294 request_id: DbUuid,
295 #[diesel(sql_type = Bytea)]
296 wallet: Vec<u8>,
297 #[diesel(sql_type = Text)]
298 underlying: String,
299 #[diesel(sql_type = Numeric)]
300 amount_usdc: rust_decimal::Decimal,
301 #[diesel(sql_type = Numeric)]
302 interest_paid_usdc: rust_decimal::Decimal,
303 #[diesel(sql_type = Numeric)]
304 principal_paid_usdc: rust_decimal::Decimal,
305 #[diesel(sql_type = Text)]
306 reason: String,
307 #[diesel(sql_type = Text)]
308 source_event_id: String,
309 #[diesel(sql_type = BigInt)]
310 engine_command_id: i64,
311 #[diesel(sql_type = BigInt)]
312 projection_seq: i64,
313 #[diesel(sql_type = Timestamptz)]
314 created_at: chrono::DateTime<chrono::Utc>,
315}
316
317impl TryFrom<RepaymentEventRow> for hypercall_db::PmSettlementRepaymentEventProjection {
318 type Error = anyhow::Error;
319
320 fn try_from(row: RepaymentEventRow) -> Result<Self> {
321 Ok(Self {
322 request_id: row.request_id.into(),
323 wallet: wallet_from_bytes(row.wallet)?,
324 underlying: row.underlying,
325 amount_usdc: row.amount_usdc,
326 interest_paid_usdc: row.interest_paid_usdc,
327 principal_paid_usdc: row.principal_paid_usdc,
328 reason: row.reason,
329 source_event_id: row.source_event_id,
330 engine_command_id: row.engine_command_id,
331 projection_seq: row.projection_seq,
332 created_at: row.created_at,
333 })
334 }
335}
336
337#[derive(QueryableByName)]
338struct RecoveryPlanRow {
339 #[diesel(sql_type = Text)]
340 plan_id: String,
341 #[diesel(sql_type = Bytea)]
342 wallet: Vec<u8>,
343 #[diesel(sql_type = Text)]
344 underlying: String,
345 #[diesel(sql_type = Text)]
346 status: String,
347 #[diesel(sql_type = Bool)]
348 placeholder: bool,
349 #[diesel(sql_type = Nullable<Text>)]
350 trigger: Option<String>,
351 #[diesel(sql_type = Nullable<Text>)]
352 reason: Option<String>,
353 #[diesel(sql_type = Nullable<Integer>)]
354 policy_version: Option<i32>,
355 #[diesel(sql_type = Nullable<Integer>)]
356 recovery_priority_version: Option<i32>,
357 #[diesel(sql_type = Nullable<Numeric>)]
358 target_reduction_usdc: Option<rust_decimal::Decimal>,
359 #[diesel(sql_type = Nullable<Numeric>)]
360 expected_usdc_recovered: Option<rust_decimal::Decimal>,
361 #[diesel(sql_type = Nullable<Numeric>)]
362 expected_obligation_reduced: Option<rust_decimal::Decimal>,
363 #[diesel(sql_type = Nullable<Numeric>)]
364 expected_impact_usdc: Option<rust_decimal::Decimal>,
365 #[diesel(sql_type = Nullable<Numeric>)]
366 post_plan_utilization: Option<rust_decimal::Decimal>,
367 #[diesel(sql_type = BigInt)]
368 engine_command_id: i64,
369 #[diesel(sql_type = BigInt)]
370 projection_seq: i64,
371 #[diesel(sql_type = Timestamptz)]
372 created_at: chrono::DateTime<chrono::Utc>,
373 #[diesel(sql_type = Timestamptz)]
374 updated_at: chrono::DateTime<chrono::Utc>,
375}
376
377impl TryFrom<RecoveryPlanRow> for hypercall_db::PmRecoveryPlanProjection {
378 type Error = anyhow::Error;
379
380 fn try_from(row: RecoveryPlanRow) -> Result<Self> {
381 Ok(Self {
382 plan_id: row.plan_id,
383 wallet: wallet_from_bytes(row.wallet)?,
384 underlying: row.underlying,
385 status: row.status,
386 placeholder: row.placeholder,
387 trigger: row.trigger,
388 reason: row.reason,
389 policy_version: row.policy_version,
390 recovery_priority_version: row.recovery_priority_version,
391 target_reduction_usdc: row.target_reduction_usdc,
392 expected_usdc_recovered: row.expected_usdc_recovered,
393 expected_obligation_reduced: row.expected_obligation_reduced,
394 expected_impact_usdc: row.expected_impact_usdc,
395 post_plan_utilization: row.post_plan_utilization,
396 engine_command_id: row.engine_command_id,
397 projection_seq: row.projection_seq,
398 created_at: row.created_at,
399 updated_at: row.updated_at,
400 })
401 }
402}
403
404#[derive(QueryableByName)]
405struct RecoveryActionRow {
406 #[diesel(sql_type = Text)]
407 plan_id: String,
408 #[diesel(sql_type = Text)]
409 action_id: String,
410 #[diesel(sql_type = Text)]
411 action_type: String,
412 #[diesel(sql_type = Text)]
413 status: String,
414 #[diesel(sql_type = Nullable<Text>)]
415 target: Option<String>,
416 #[diesel(sql_type = Nullable<Integer>)]
417 attempt: Option<i32>,
418 #[diesel(sql_type = Nullable<Text>)]
419 external_id: Option<String>,
420 #[diesel(sql_type = Nullable<Text>)]
421 external_kind: Option<String>,
422 #[diesel(sql_type = Nullable<Text>)]
423 result_external_id: Option<String>,
424 #[diesel(sql_type = Nullable<Text>)]
425 result: Option<String>,
426 #[diesel(sql_type = Nullable<Numeric>)]
427 expected_usdc_recovered: Option<rust_decimal::Decimal>,
428 #[diesel(sql_type = Nullable<Numeric>)]
429 expected_obligation_reduced: Option<rust_decimal::Decimal>,
430 #[diesel(sql_type = Nullable<Numeric>)]
431 expected_impact_usdc: Option<rust_decimal::Decimal>,
432 #[diesel(sql_type = Nullable<Numeric>)]
433 recovered_usdc: Option<rust_decimal::Decimal>,
434 #[diesel(sql_type = Nullable<Numeric>)]
435 liability_reduction_usdc: Option<rust_decimal::Decimal>,
436 #[diesel(sql_type = BigInt)]
437 engine_command_id: i64,
438 #[diesel(sql_type = BigInt)]
439 projection_seq: i64,
440 #[diesel(sql_type = Timestamptz)]
441 created_at: chrono::DateTime<chrono::Utc>,
442 #[diesel(sql_type = Timestamptz)]
443 updated_at: chrono::DateTime<chrono::Utc>,
444}
445
446impl From<RecoveryActionRow> for hypercall_db::PmRecoveryActionProjection {
447 fn from(row: RecoveryActionRow) -> Self {
448 Self {
449 plan_id: row.plan_id,
450 action_id: row.action_id,
451 action_type: row.action_type,
452 status: row.status,
453 target: row.target,
454 attempt: row.attempt,
455 external_id: row.external_id,
456 external_kind: row.external_kind,
457 result_external_id: row.result_external_id,
458 result: row.result,
459 expected_usdc_recovered: row.expected_usdc_recovered,
460 expected_obligation_reduced: row.expected_obligation_reduced,
461 expected_impact_usdc: row.expected_impact_usdc,
462 recovered_usdc: row.recovered_usdc,
463 liability_reduction_usdc: row.liability_reduction_usdc,
464 engine_command_id: row.engine_command_id,
465 projection_seq: row.projection_seq,
466 created_at: row.created_at,
467 updated_at: row.updated_at,
468 }
469 }
470}
471
472#[derive(QueryableByName)]
473struct RecoveryActionGateCountsRow {
474 #[diesel(sql_type = BigInt)]
475 total_actions: i64,
476 #[diesel(sql_type = BigInt)]
477 planned_actions: i64,
478 #[diesel(sql_type = BigInt)]
479 submitted_actions: i64,
480 #[diesel(sql_type = BigInt)]
481 terminal_actions: i64,
482}
483
484impl From<RecoveryActionGateCountsRow> for hypercall_db::PmRecoveryActionGateCounts {
485 fn from(row: RecoveryActionGateCountsRow) -> Self {
486 Self {
487 total_actions: row.total_actions,
488 planned_actions: row.planned_actions,
489 submitted_actions: row.submitted_actions,
490 terminal_actions: row.terminal_actions,
491 }
492 }
493}
494
495fn apply_pm_settlement_projection_write(
496 conn: &mut PgConnection,
497 write: &hypercall_db::PmSettlementProjectionWrite,
498) -> QueryResult<usize> {
499 match write {
500 hypercall_db::PmSettlementProjectionWrite::Pool(pool) => {
501 diesel::sql_query(
502 "INSERT INTO pm_settlement_pools (
503 underlying, config_version, policy_version, pool_available_usdc,
504 pool_target_usdc, pool_capacity_usdc, pool_utilization,
505 active_timing_bridge_usdc, active_settlement_debt_usdc,
506 target_short_oi_notional_multiplier, utilization_kink, apr_at_kink,
507 max_apr, normal_utilization_cap, crisis_utilization_cap,
508 bridge_window_ms, projection_seq, updated_at
509 ) VALUES (
510 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, to_timestamp($18::double precision / 1000.0)
511 )
512 ON CONFLICT (underlying) DO UPDATE SET
513 config_version = EXCLUDED.config_version,
514 policy_version = EXCLUDED.policy_version,
515 pool_available_usdc = EXCLUDED.pool_available_usdc,
516 pool_target_usdc = EXCLUDED.pool_target_usdc,
517 pool_capacity_usdc = EXCLUDED.pool_capacity_usdc,
518 pool_utilization = EXCLUDED.pool_utilization,
519 active_timing_bridge_usdc = EXCLUDED.active_timing_bridge_usdc,
520 active_settlement_debt_usdc = EXCLUDED.active_settlement_debt_usdc,
521 target_short_oi_notional_multiplier = EXCLUDED.target_short_oi_notional_multiplier,
522 utilization_kink = EXCLUDED.utilization_kink,
523 apr_at_kink = EXCLUDED.apr_at_kink,
524 max_apr = EXCLUDED.max_apr,
525 normal_utilization_cap = EXCLUDED.normal_utilization_cap,
526 crisis_utilization_cap = EXCLUDED.crisis_utilization_cap,
527 bridge_window_ms = EXCLUDED.bridge_window_ms,
528 projection_seq = GREATEST(pm_settlement_pools.projection_seq, EXCLUDED.projection_seq),
529 updated_at = EXCLUDED.updated_at
530 WHERE pm_settlement_pools.projection_seq <= EXCLUDED.projection_seq",
531 )
532 .bind::<Text, _>(&pool.underlying)
533 .bind::<Integer, _>(pool.config_version)
534 .bind::<Integer, _>(pool.policy_version)
535 .bind::<Numeric, _>(pool.pool_available_usdc)
536 .bind::<Numeric, _>(pool.pool_target_usdc)
537 .bind::<Numeric, _>(pool.pool_capacity_usdc)
538 .bind::<Nullable<Numeric>, _>(pool.pool_utilization)
539 .bind::<Numeric, _>(pool.active_timing_bridge_usdc)
540 .bind::<Numeric, _>(pool.active_settlement_debt_usdc)
541 .bind::<Nullable<Numeric>, _>(pool.target_short_oi_notional_multiplier)
542 .bind::<Nullable<Numeric>, _>(pool.utilization_kink)
543 .bind::<Nullable<Numeric>, _>(pool.apr_at_kink)
544 .bind::<Nullable<Numeric>, _>(pool.max_apr)
545 .bind::<Nullable<Numeric>, _>(pool.normal_utilization_cap)
546 .bind::<Nullable<Numeric>, _>(pool.crisis_utilization_cap)
547 .bind::<Nullable<BigInt>, _>(pool.bridge_window_ms)
548 .bind::<BigInt, _>(pool.projection_seq)
549 .bind::<BigInt, _>(pool.updated_at_ms)
550 .execute(conn)
551 }
552 hypercall_db::PmSettlementProjectionWrite::Account(account) => {
553 diesel::sql_query(
554 "INSERT INTO pm_settlement_accounts (
555 wallet, underlying, status, timing_bridge_principal_usdc,
556 settlement_debt_principal_usdc, accrued_interest_usdc,
557 interest_cursor_ms, bridge_deadline_ms, active_recovery_plan_id,
558 policy_version, projection_seq, updated_at
559 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, to_timestamp($12::double precision / 1000.0))
560 ON CONFLICT (wallet, underlying) DO UPDATE SET
561 status = EXCLUDED.status,
562 timing_bridge_principal_usdc = EXCLUDED.timing_bridge_principal_usdc,
563 settlement_debt_principal_usdc = EXCLUDED.settlement_debt_principal_usdc,
564 accrued_interest_usdc = EXCLUDED.accrued_interest_usdc,
565 interest_cursor_ms = EXCLUDED.interest_cursor_ms,
566 bridge_deadline_ms = EXCLUDED.bridge_deadline_ms,
567 active_recovery_plan_id = EXCLUDED.active_recovery_plan_id,
568 policy_version = EXCLUDED.policy_version,
569 projection_seq = GREATEST(pm_settlement_accounts.projection_seq, EXCLUDED.projection_seq),
570 updated_at = EXCLUDED.updated_at
571 WHERE pm_settlement_accounts.projection_seq <= EXCLUDED.projection_seq",
572 )
573 .bind::<Bytea, _>(account.wallet.as_bytes())
574 .bind::<Text, _>(&account.underlying)
575 .bind::<Text, _>(&account.status)
576 .bind::<Numeric, _>(account.timing_bridge_principal_usdc)
577 .bind::<Numeric, _>(account.settlement_debt_principal_usdc)
578 .bind::<Numeric, _>(account.accrued_interest_usdc)
579 .bind::<BigInt, _>(account.interest_cursor_ms)
580 .bind::<Nullable<BigInt>, _>(account.bridge_deadline_ms)
581 .bind::<Nullable<Text>, _>(&account.active_recovery_plan_id)
582 .bind::<Integer, _>(account.policy_version)
583 .bind::<BigInt, _>(account.projection_seq)
584 .bind::<BigInt, _>(account.updated_at_ms)
585 .execute(conn)
586 }
587 hypercall_db::PmSettlementProjectionWrite::Event(event) => {
588 diesel::sql_query(
589 "INSERT INTO pm_settlement_events (
590 event_key, wallet, underlying, event_type, amount_usdc,
591 request_id, input_digest
592 ) VALUES ($1, $2, $3, $4, $5, $6, $7)
593 ON CONFLICT (event_key) DO UPDATE SET
594 event_type = EXCLUDED.event_type,
595 amount_usdc = EXCLUDED.amount_usdc,
596 request_id = EXCLUDED.request_id,
597 input_digest = EXCLUDED.input_digest",
598 )
599 .bind::<Text, _>(&event.event_key)
600 .bind::<Bytea, _>(event.wallet.as_bytes())
601 .bind::<Text, _>(&event.underlying)
602 .bind::<Text, _>(&event.event_type)
603 .bind::<Numeric, _>(event.amount_usdc)
604 .bind::<SqlUuid, _>(DbUuid::from(event.request_id))
605 .bind::<Text, _>(&event.input_digest)
606 .execute(conn)
607 }
608 hypercall_db::PmSettlementProjectionWrite::InterestEvent(event) => {
609 diesel::sql_query(
610 "INSERT INTO pm_settlement_interest_events (
611 request_id, wallet, underlying, from_ms, to_ms, utilization, apr,
612 interest_usdc, policy_version
613 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
614 ON CONFLICT (request_id) DO NOTHING",
615 )
616 .bind::<SqlUuid, _>(DbUuid(event.request_id))
617 .bind::<Bytea, _>(event.wallet.as_bytes())
618 .bind::<Text, _>(&event.underlying)
619 .bind::<BigInt, _>(event.from_ms)
620 .bind::<BigInt, _>(event.to_ms)
621 .bind::<Numeric, _>(event.utilization)
622 .bind::<Numeric, _>(event.apr)
623 .bind::<Numeric, _>(event.interest_usdc)
624 .bind::<Integer, _>(event.policy_version)
625 .execute(conn)
626 }
627 hypercall_db::PmSettlementProjectionWrite::RepaymentEvent(event) => {
628 diesel::sql_query(
629 "INSERT INTO pm_settlement_repayment_events (
630 request_id, wallet, underlying, amount_usdc, interest_paid_usdc,
631 principal_paid_usdc, reason, source_event_id
632 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
633 ON CONFLICT (request_id) DO NOTHING",
634 )
635 .bind::<SqlUuid, _>(DbUuid(event.request_id))
636 .bind::<Bytea, _>(event.wallet.as_bytes())
637 .bind::<Text, _>(&event.underlying)
638 .bind::<Numeric, _>(event.amount_usdc)
639 .bind::<Numeric, _>(event.interest_paid_usdc)
640 .bind::<Numeric, _>(event.principal_paid_usdc)
641 .bind::<Text, _>(&event.reason)
642 .bind::<Text, _>(&event.source_event_id)
643 .execute(conn)
644 }
645 hypercall_db::PmSettlementProjectionWrite::VaultDeposit(deposit) => {
646 diesel::sql_query(
647 "INSERT INTO pm_vault_deposits (
648 deposit_id, depositor, underlying, principal_usdc, remaining_usdc,
649 withdrawn_usdc, reserved_withdrawal_usdc, chain_id, source_contract_address,
650 tx_hash, log_index, max_listed_expiry_ts_ms, settlement_grace_ms,
651 lock_until_ms, status, projection_seq, created_at, updated_at
652 ) VALUES (
653 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14,
654 $15::pm_vault_deposit_status, $16, to_timestamp($17::double precision / 1000.0),
655 to_timestamp($18::double precision / 1000.0)
656 )
657 ON CONFLICT (deposit_id) DO UPDATE SET
658 remaining_usdc = EXCLUDED.remaining_usdc,
659 withdrawn_usdc = EXCLUDED.withdrawn_usdc,
660 reserved_withdrawal_usdc = EXCLUDED.reserved_withdrawal_usdc,
661 status = EXCLUDED.status,
662 projection_seq = GREATEST(pm_vault_deposits.projection_seq, EXCLUDED.projection_seq),
663 updated_at = EXCLUDED.updated_at
664 WHERE pm_vault_deposits.projection_seq <= EXCLUDED.projection_seq",
665 )
666 .bind::<SqlUuid, _>(DbUuid(deposit.deposit_id))
667 .bind::<Bytea, _>(deposit.depositor.as_bytes())
668 .bind::<Text, _>(&deposit.underlying)
669 .bind::<Numeric, _>(deposit.principal_usdc)
670 .bind::<Numeric, _>(deposit.remaining_usdc)
671 .bind::<Numeric, _>(deposit.withdrawn_usdc)
672 .bind::<Numeric, _>(deposit.reserved_withdrawal_usdc)
673 .bind::<BigInt, _>(deposit.chain_id)
674 .bind::<Bytea, _>(deposit.source_contract_address.as_bytes())
675 .bind::<Text, _>(&deposit.tx_hash)
676 .bind::<Integer, _>(deposit.log_index)
677 .bind::<BigInt, _>(deposit.max_listed_expiry_ts_ms)
678 .bind::<BigInt, _>(deposit.settlement_grace_ms)
679 .bind::<BigInt, _>(deposit.lock_until_ms)
680 .bind::<Text, _>(deposit.status.as_str())
681 .bind::<BigInt, _>(deposit.projection_seq)
682 .bind::<BigInt, _>(deposit.created_at_ms)
683 .bind::<BigInt, _>(deposit.updated_at_ms)
684 .execute(conn)
685 }
686 hypercall_db::PmSettlementProjectionWrite::VaultWithdrawal(withdrawal) => {
687 diesel::sql_query(
688 "INSERT INTO pm_vault_withdrawals (
689 withdrawal_id, deposit_id, depositor, underlying, amount_usdc,
690 lock_until_ms, status, projection_seq, requested_at, updated_at
691 ) VALUES (
692 $1, $2, $3, $4, $5, $6, $7::pm_vault_withdrawal_status, $8,
693 to_timestamp($9::double precision / 1000.0),
694 to_timestamp($10::double precision / 1000.0)
695 )
696 ON CONFLICT (withdrawal_id) DO UPDATE SET
697 status = EXCLUDED.status,
698 projection_seq = GREATEST(pm_vault_withdrawals.projection_seq, EXCLUDED.projection_seq),
699 updated_at = EXCLUDED.updated_at
700 WHERE pm_vault_withdrawals.projection_seq <= EXCLUDED.projection_seq",
701 )
702 .bind::<SqlUuid, _>(DbUuid(withdrawal.withdrawal_id))
703 .bind::<SqlUuid, _>(DbUuid(withdrawal.deposit_id))
704 .bind::<Bytea, _>(withdrawal.depositor.as_bytes())
705 .bind::<Text, _>(&withdrawal.underlying)
706 .bind::<Numeric, _>(withdrawal.amount_usdc)
707 .bind::<BigInt, _>(withdrawal.lock_until_ms)
708 .bind::<Text, _>(withdrawal.status.as_str())
709 .bind::<BigInt, _>(withdrawal.projection_seq)
710 .bind::<BigInt, _>(withdrawal.requested_at_ms)
711 .bind::<BigInt, _>(withdrawal.updated_at_ms)
712 .execute(conn)
713 }
714 hypercall_db::PmSettlementProjectionWrite::RecoveryPlan(plan) => {
715 let updated = diesel::sql_query(
716 "INSERT INTO pm_recovery_plans (
717 plan_id, wallet, underlying, status, placeholder, trigger, reason,
718 policy_version, recovery_priority_version, target_reduction_usdc,
719 expected_usdc_recovered, expected_obligation_reduced, expected_impact_usdc,
720 post_plan_utilization, projection_seq, updated_at
721 ) VALUES (
722 $1, $2, $3, $4, false, $5, $6, $7, $8, $9, $10, $11, $12, $13,
723 $14, to_timestamp($15::double precision / 1000.0)
724 )
725 ON CONFLICT (plan_id) DO UPDATE SET
726 wallet = EXCLUDED.wallet,
727 underlying = EXCLUDED.underlying,
728 status = EXCLUDED.status,
729 placeholder = false,
730 trigger = EXCLUDED.trigger,
731 reason = EXCLUDED.reason,
732 policy_version = EXCLUDED.policy_version,
733 recovery_priority_version = EXCLUDED.recovery_priority_version,
734 target_reduction_usdc = EXCLUDED.target_reduction_usdc,
735 expected_usdc_recovered = EXCLUDED.expected_usdc_recovered,
736 expected_obligation_reduced = EXCLUDED.expected_obligation_reduced,
737 expected_impact_usdc = EXCLUDED.expected_impact_usdc,
738 post_plan_utilization = EXCLUDED.post_plan_utilization,
739 projection_seq = GREATEST(pm_recovery_plans.projection_seq, EXCLUDED.projection_seq),
740 updated_at = EXCLUDED.updated_at
741 WHERE pm_recovery_plans.projection_seq <= EXCLUDED.projection_seq",
742 )
743 .bind::<Text, _>(&plan.plan_id)
744 .bind::<Bytea, _>(plan.wallet.as_bytes())
745 .bind::<Text, _>(&plan.underlying)
746 .bind::<Text, _>(&plan.status)
747 .bind::<Text, _>(&plan.trigger)
748 .bind::<Text, _>(&plan.reason)
749 .bind::<Integer, _>(plan.policy_version)
750 .bind::<Integer, _>(plan.recovery_priority_version)
751 .bind::<Numeric, _>(plan.target_reduction_usdc)
752 .bind::<Numeric, _>(plan.expected_usdc_recovered)
753 .bind::<Numeric, _>(plan.expected_obligation_reduced)
754 .bind::<Numeric, _>(plan.expected_impact_usdc)
755 .bind::<Nullable<Numeric>, _>(plan.post_plan_utilization)
756 .bind::<BigInt, _>(plan.projection_seq)
757 .bind::<BigInt, _>(plan.updated_at_ms)
758 .execute(conn)?;
759
760 for action in &plan.actions {
761 diesel::sql_query(
762 "INSERT INTO pm_recovery_actions (
763 plan_id, action_id, action_type, status, target, attempt,
764 external_id, external_kind, result_external_id, result, expected_usdc_recovered,
765 expected_obligation_reduced, expected_impact_usdc, recovered_usdc,
766 liability_reduction_usdc, projection_seq, updated_at
767 ) VALUES (
768 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14,
769 $15, $16, to_timestamp($17::double precision / 1000.0)
770 )
771 ON CONFLICT (plan_id, action_id) DO UPDATE SET
772 action_type = EXCLUDED.action_type,
773 status = EXCLUDED.status,
774 target = EXCLUDED.target,
775 attempt = EXCLUDED.attempt,
776 external_id = EXCLUDED.external_id,
777 external_kind = EXCLUDED.external_kind,
778 result_external_id = EXCLUDED.result_external_id,
779 result = EXCLUDED.result,
780 expected_usdc_recovered = EXCLUDED.expected_usdc_recovered,
781 expected_obligation_reduced = EXCLUDED.expected_obligation_reduced,
782 expected_impact_usdc = EXCLUDED.expected_impact_usdc,
783 recovered_usdc = EXCLUDED.recovered_usdc,
784 liability_reduction_usdc = EXCLUDED.liability_reduction_usdc,
785 projection_seq = GREATEST(pm_recovery_actions.projection_seq, EXCLUDED.projection_seq),
786 updated_at = EXCLUDED.updated_at
787 WHERE pm_recovery_actions.projection_seq <= EXCLUDED.projection_seq",
788 )
789 .bind::<Text, _>(&action.plan_id)
790 .bind::<Text, _>(&action.action_id)
791 .bind::<Text, _>(&action.action_type)
792 .bind::<Text, _>(&action.status)
793 .bind::<Text, _>(&action.target)
794 .bind::<Integer, _>(action.attempt)
795 .bind::<Nullable<Text>, _>(&action.external_id)
796 .bind::<Nullable<Text>, _>(&action.external_kind)
797 .bind::<Nullable<Text>, _>(&action.result_external_id)
798 .bind::<Nullable<Text>, _>(&action.result)
799 .bind::<Numeric, _>(action.expected_usdc_recovered)
800 .bind::<Numeric, _>(action.expected_obligation_reduced)
801 .bind::<Numeric, _>(action.expected_impact_usdc)
802 .bind::<Numeric, _>(action.recovered_usdc)
803 .bind::<Numeric, _>(action.liability_reduction_usdc)
804 .bind::<BigInt, _>(action.projection_seq)
805 .bind::<BigInt, _>(action.updated_at_ms)
806 .execute(conn)?;
807 }
808 Ok(updated)
809 }
810 }
811}
812
813#[async_trait::async_trait]
814impl hypercall_db::PmSettlementProjectionReader for DieselDb {
815 async fn list_pm_settlement_pools(
816 &self,
817 ) -> Result<Vec<hypercall_db::PmSettlementPoolProjection>> {
818 let mut conn = self.get_conn().await?;
819 let rows = diesel_async::RunQueryDsl::load::<PoolRow>(
820 diesel::sql_query("SELECT * FROM pm_settlement_pools ORDER BY underlying LIMIT 1000"),
821 &mut conn,
822 )
823 .await?;
824 Ok(rows.into_iter().map(Into::into).collect())
825 }
826
827 async fn pm_settlement_pool_gate_counts(
828 &self,
829 ) -> Result<hypercall_db::PmSettlementPoolGateCounts> {
830 let mut conn = self.get_conn().await?;
831 let row = diesel_async::RunQueryDsl::get_result::<PoolGateCountsRow>(
832 diesel::sql_query(
833 "SELECT
834 COUNT(*)::bigint AS total_pools,
835 COUNT(*) FILTER (WHERE pool_available_usdc < pool_target_usdc)::bigint AS below_target_pools,
836 COUNT(*) FILTER (
837 WHERE pool_utilization IS NOT NULL
838 AND crisis_utilization_cap IS NOT NULL
839 AND pool_utilization > crisis_utilization_cap
840 )::bigint AS above_crisis_cap_pools,
841 COUNT(*) FILTER (WHERE pool_utilization IS NULL)::bigint AS missing_utilization_pools,
842 COUNT(*) FILTER (WHERE crisis_utilization_cap IS NULL)::bigint AS missing_crisis_cap_pools
843 FROM pm_settlement_pools",
844 ),
845 &mut conn,
846 )
847 .await?;
848 Ok(row.into())
849 }
850
851 async fn list_pm_settlement_accounts(
852 &self,
853 ) -> Result<Vec<hypercall_db::PmSettlementAccountProjection>> {
854 let mut conn = self.get_conn().await?;
855 let rows = diesel_async::RunQueryDsl::load::<AccountRow>(
856 diesel::sql_query(
857 "SELECT * FROM pm_settlement_accounts ORDER BY underlying, wallet LIMIT 1000",
858 ),
859 &mut conn,
860 )
861 .await?;
862 rows.into_iter().map(TryInto::try_into).collect()
863 }
864
865 async fn pm_settlement_account_gate_counts(
866 &self,
867 now_ms: i64,
868 ) -> Result<hypercall_db::PmSettlementAccountGateCounts> {
869 let mut conn = self.get_conn().await?;
870 let row = diesel_async::RunQueryDsl::get_result::<AccountGateCountsRow>(
871 diesel::sql_query(
872 "SELECT
873 COUNT(*)::bigint AS total_accounts,
874 COUNT(*) FILTER (WHERE timing_bridge_principal_usdc > 0)::bigint AS bridged_accounts,
875 COUNT(*) FILTER (WHERE settlement_debt_principal_usdc > 0)::bigint AS debt_accounts,
876 COUNT(*) FILTER (
877 WHERE timing_bridge_principal_usdc > 0
878 AND (
879 bridge_deadline_ms IS NULL
880 OR bridge_deadline_ms < $1
881 )
882 )::bigint AS overdue_bridge_accounts,
883 COUNT(*) FILTER (
884 WHERE active_recovery_plan_id IS NOT NULL
885 AND (
886 timing_bridge_principal_usdc > 0
887 OR settlement_debt_principal_usdc > 0
888 OR accrued_interest_usdc > 0
889 )
890 )::bigint AS active_recovery_accounts,
891 COALESCE(SUM(timing_bridge_principal_usdc) FILTER (WHERE timing_bridge_principal_usdc > 0), 0) AS active_bridge_usdc,
892 COALESCE(SUM(settlement_debt_principal_usdc) FILTER (WHERE settlement_debt_principal_usdc > 0), 0) AS active_debt_usdc
893 FROM pm_settlement_accounts",
894 )
895 .bind::<BigInt, _>(now_ms),
896 &mut conn,
897 )
898 .await?;
899 Ok(row.into())
900 }
901
902 async fn list_pm_settlement_events(
903 &self,
904 ) -> Result<Vec<hypercall_db::PmSettlementEventProjection>> {
905 let mut conn = self.get_conn().await?;
906 let rows = diesel_async::RunQueryDsl::load::<EventRow>(
907 diesel::sql_query(
908 "SELECT * FROM pm_settlement_events ORDER BY created_at DESC, event_key LIMIT 1000",
909 ),
910 &mut conn,
911 )
912 .await?;
913 rows.into_iter().map(TryInto::try_into).collect()
914 }
915
916 async fn list_pm_settlement_interest_events(
917 &self,
918 ) -> Result<Vec<hypercall_db::PmSettlementInterestEventProjection>> {
919 let mut conn = self.get_conn().await?;
920 let rows = diesel_async::RunQueryDsl::load::<InterestEventRow>(
921 diesel::sql_query(
922 "SELECT * FROM pm_settlement_interest_events ORDER BY created_at DESC, request_id LIMIT 1000",
923 ),
924 &mut conn,
925 )
926 .await?;
927 rows.into_iter().map(TryInto::try_into).collect()
928 }
929
930 async fn list_pm_settlement_repayment_events(
931 &self,
932 ) -> Result<Vec<hypercall_db::PmSettlementRepaymentEventProjection>> {
933 let mut conn = self.get_conn().await?;
934 let rows = diesel_async::RunQueryDsl::load::<RepaymentEventRow>(
935 diesel::sql_query(
936 "SELECT * FROM pm_settlement_repayment_events ORDER BY created_at DESC, request_id LIMIT 1000",
937 ),
938 &mut conn,
939 )
940 .await?;
941 rows.into_iter().map(TryInto::try_into).collect()
942 }
943
944 async fn list_pm_recovery_plans(&self) -> Result<Vec<hypercall_db::PmRecoveryPlanProjection>> {
945 let mut conn = self.get_conn().await?;
946 let rows = diesel_async::RunQueryDsl::load::<RecoveryPlanRow>(
947 diesel::sql_query(
948 "SELECT * FROM pm_recovery_plans ORDER BY created_at DESC, plan_id LIMIT 1000",
949 ),
950 &mut conn,
951 )
952 .await?;
953 rows.into_iter().map(TryInto::try_into).collect()
954 }
955
956 async fn list_pm_recovery_actions(
957 &self,
958 ) -> Result<Vec<hypercall_db::PmRecoveryActionProjection>> {
959 let mut conn = self.get_conn().await?;
960 let rows = diesel_async::RunQueryDsl::load::<RecoveryActionRow>(
961 diesel::sql_query(
962 "SELECT * FROM pm_recovery_actions ORDER BY created_at DESC, plan_id, action_id LIMIT 1000",
963 ),
964 &mut conn,
965 )
966 .await?;
967 Ok(rows.into_iter().map(Into::into).collect())
968 }
969
970 async fn pm_recovery_action_gate_counts(
971 &self,
972 ) -> Result<hypercall_db::PmRecoveryActionGateCounts> {
973 let mut conn = self.get_conn().await?;
974 let row = diesel_async::RunQueryDsl::get_result::<RecoveryActionGateCountsRow>(
975 diesel::sql_query(
976 "SELECT
977 COUNT(*)::bigint AS total_actions,
978 COUNT(*) FILTER (WHERE status = 'Planned')::bigint AS planned_actions,
979 COUNT(*) FILTER (WHERE status = 'Submitted')::bigint AS submitted_actions,
980 COUNT(*) FILTER (WHERE status IN ('Confirmed', 'Failed', 'Canceled', 'TimedOut'))::bigint AS terminal_actions
981 FROM pm_recovery_actions",
982 ),
983 &mut conn,
984 )
985 .await?;
986 Ok(row.into())
987 }
988}
989
990impl hypercall_db::PmSettlementProjectionSyncReader for DatabaseHandler {
991 fn list_pm_settlement_pools_sync(
992 &self,
993 ) -> Result<Vec<hypercall_db::PmSettlementPoolProjection>> {
994 let mut conn = self.pool().get()?;
995 let rows =
996 diesel::sql_query("SELECT * FROM pm_settlement_pools ORDER BY underlying LIMIT 1000")
997 .load::<PoolRow>(&mut conn)?;
998 Ok(rows.into_iter().map(Into::into).collect())
999 }
1000
1001 fn list_pm_settlement_accounts_sync(
1002 &self,
1003 ) -> Result<Vec<hypercall_db::PmSettlementAccountProjection>> {
1004 let mut conn = self.pool().get()?;
1005 let rows = diesel::sql_query(
1006 "SELECT * FROM pm_settlement_accounts ORDER BY underlying, wallet LIMIT 1000",
1007 )
1008 .load::<AccountRow>(&mut conn)?;
1009 rows.into_iter().map(TryInto::try_into).collect()
1010 }
1011
1012 fn list_pm_settlement_events_sync(
1013 &self,
1014 ) -> Result<Vec<hypercall_db::PmSettlementEventProjection>> {
1015 let mut conn = self.pool().get()?;
1016 let rows = diesel::sql_query(
1017 "SELECT * FROM pm_settlement_events ORDER BY created_at DESC, event_key LIMIT 1000",
1018 )
1019 .load::<EventRow>(&mut conn)?;
1020 rows.into_iter().map(TryInto::try_into).collect()
1021 }
1022
1023 fn list_pm_settlement_interest_events_sync(
1024 &self,
1025 ) -> Result<Vec<hypercall_db::PmSettlementInterestEventProjection>> {
1026 let mut conn = self.pool().get()?;
1027 let rows = diesel::sql_query(
1028 "SELECT * FROM pm_settlement_interest_events ORDER BY created_at DESC, request_id LIMIT 1000",
1029 )
1030 .load::<InterestEventRow>(&mut conn)?;
1031 rows.into_iter().map(TryInto::try_into).collect()
1032 }
1033
1034 fn list_pm_settlement_repayment_events_sync(
1035 &self,
1036 ) -> Result<Vec<hypercall_db::PmSettlementRepaymentEventProjection>> {
1037 let mut conn = self.pool().get()?;
1038 let rows = diesel::sql_query(
1039 "SELECT * FROM pm_settlement_repayment_events ORDER BY created_at DESC, request_id LIMIT 1000",
1040 )
1041 .load::<RepaymentEventRow>(&mut conn)?;
1042 rows.into_iter().map(TryInto::try_into).collect()
1043 }
1044
1045 fn list_pm_recovery_plans_sync(&self) -> Result<Vec<hypercall_db::PmRecoveryPlanProjection>> {
1046 let mut conn = self.pool().get()?;
1047 let rows = diesel::sql_query(
1048 "SELECT * FROM pm_recovery_plans ORDER BY created_at DESC, plan_id LIMIT 1000",
1049 )
1050 .load::<RecoveryPlanRow>(&mut conn)?;
1051 rows.into_iter().map(TryInto::try_into).collect()
1052 }
1053
1054 fn list_pm_recovery_actions_sync(
1055 &self,
1056 ) -> Result<Vec<hypercall_db::PmRecoveryActionProjection>> {
1057 let mut conn = self.pool().get()?;
1058 let rows = diesel::sql_query(
1059 "SELECT * FROM pm_recovery_actions ORDER BY created_at DESC, plan_id, action_id LIMIT 1000",
1060 )
1061 .load::<RecoveryActionRow>(&mut conn)?;
1062 Ok(rows.into_iter().map(Into::into).collect())
1063 }
1064}
1065
1066impl hypercall_db::PmSettlementProjectionSyncWriter for DatabaseHandler {
1067 fn apply_pm_settlement_projection_writes_sync(
1068 &self,
1069 writes: &[hypercall_db::PmSettlementProjectionWrite],
1070 ) -> Result<()> {
1071 if writes.is_empty() {
1072 return Ok(());
1073 }
1074 let mut conn = self.pool().get()?;
1075 conn.transaction::<_, diesel::result::Error, _>(|conn| {
1076 for write in writes {
1077 apply_pm_settlement_projection_write(conn, write)?;
1078 }
1079 Ok(())
1080 })?;
1081 Ok(())
1082 }
1083}
1084
1085#[cfg(test)]
1086mod tests {
1087 use diesel::RunQueryDsl;
1088 use hypercall_db::{
1089 PmSettlementProjectionReader, PmSettlementProjectionSyncReader,
1090 PmSettlementProjectionSyncWriter,
1091 };
1092 use rust_decimal_macros::dec;
1093
1094 use super::*;
1095 use crate::test_helpers::TestDb;
1096
1097 #[tokio::test]
1098 async fn pm_settlement_projection_reader_lists_phase2_rows() {
1099 let test_db = TestDb::new().await.unwrap();
1100 let db = test_db.handler.as_ref();
1101 let wallet = hypercall_types::wallet_address::test_wallet(168);
1102 let event_request_id = uuid::Uuid::new_v4();
1103 let interest_request_id = uuid::Uuid::new_v4();
1104 let repayment_request_id = uuid::Uuid::new_v4();
1105 let mut conn = db.pool().get().unwrap();
1106
1107 diesel::sql_query(
1108 "INSERT INTO pm_settlement_pools (
1109 underlying, config_version, policy_version, pool_available_usdc,
1110 pool_target_usdc, pool_capacity_usdc, pool_utilization,
1111 active_timing_bridge_usdc, active_settlement_debt_usdc,
1112 target_short_oi_notional_multiplier, utilization_kink, apr_at_kink,
1113 max_apr, normal_utilization_cap, crisis_utilization_cap,
1114 bridge_window_ms, last_engine_command_id, projection_seq
1115 ) VALUES (
1116 'ETH', 1, 2, $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, 3600000, 11, 12
1117 )",
1118 )
1119 .bind::<Numeric, _>(dec!(1000))
1120 .bind::<Numeric, _>(dec!(1500))
1121 .bind::<Numeric, _>(dec!(1200))
1122 .bind::<Numeric, _>(dec!(0.25))
1123 .bind::<Numeric, _>(dec!(100))
1124 .bind::<Numeric, _>(dec!(50))
1125 .bind::<Numeric, _>(dec!(0.20))
1126 .bind::<Numeric, _>(dec!(0.60))
1127 .bind::<Numeric, _>(dec!(0.04))
1128 .bind::<Numeric, _>(dec!(4.00))
1129 .bind::<Numeric, _>(dec!(0.80))
1130 .bind::<Numeric, _>(dec!(0.95))
1131 .execute(&mut conn)
1132 .unwrap();
1133
1134 diesel::sql_query(
1135 "INSERT INTO pm_settlement_accounts (
1136 wallet, underlying, status, timing_bridge_principal_usdc,
1137 settlement_debt_principal_usdc, accrued_interest_usdc,
1138 interest_cursor_ms, bridge_deadline_ms, active_recovery_plan_id,
1139 policy_version, last_engine_command_id, projection_seq
1140 ) VALUES ($1, 'ETH', 'Bridged', $2, $3, $4, 100, 200, 'plan-1', 2, 11, 12)",
1141 )
1142 .bind::<Bytea, _>(wallet.as_bytes())
1143 .bind::<Numeric, _>(dec!(75))
1144 .bind::<Numeric, _>(dec!(0))
1145 .bind::<Numeric, _>(dec!(3))
1146 .execute(&mut conn)
1147 .unwrap();
1148
1149 diesel::sql_query(
1150 "INSERT INTO pm_settlement_events (
1151 event_key, wallet, underlying, event_type, amount_usdc,
1152 request_id, input_digest, engine_command_id, projection_seq
1153 ) VALUES ('event-1', $1, 'ETH', 'TimingBridge', $2, $3, 'digest-1', 11, 12)",
1154 )
1155 .bind::<Bytea, _>(wallet.as_bytes())
1156 .bind::<Numeric, _>(dec!(75))
1157 .bind::<SqlUuid, _>(DbUuid(event_request_id))
1158 .execute(&mut conn)
1159 .unwrap();
1160
1161 diesel::sql_query(
1162 "INSERT INTO pm_settlement_interest_events (
1163 request_id, wallet, underlying, from_ms, to_ms, utilization, apr,
1164 interest_usdc, policy_version, engine_command_id, projection_seq
1165 ) VALUES ($1, $2, 'ETH', 100, 200, $3, $4, $5, 2, 11, 12)",
1166 )
1167 .bind::<SqlUuid, _>(DbUuid(interest_request_id))
1168 .bind::<Bytea, _>(wallet.as_bytes())
1169 .bind::<Numeric, _>(dec!(0.25))
1170 .bind::<Numeric, _>(dec!(0.04))
1171 .bind::<Numeric, _>(dec!(3))
1172 .execute(&mut conn)
1173 .unwrap();
1174
1175 diesel::sql_query(
1176 "INSERT INTO pm_settlement_repayment_events (
1177 request_id, wallet, underlying, amount_usdc, interest_paid_usdc,
1178 principal_paid_usdc, reason, source_event_id, engine_command_id, projection_seq
1179 ) VALUES ($1, $2, 'ETH', $3, $4, $5, 'manual', 'event-1', 11, 12)",
1180 )
1181 .bind::<SqlUuid, _>(DbUuid(repayment_request_id))
1182 .bind::<Bytea, _>(wallet.as_bytes())
1183 .bind::<Numeric, _>(dec!(10))
1184 .bind::<Numeric, _>(dec!(3))
1185 .bind::<Numeric, _>(dec!(7))
1186 .execute(&mut conn)
1187 .unwrap();
1188
1189 diesel::sql_query(
1190 "INSERT INTO pm_recovery_plans (
1191 plan_id, wallet, underlying, status, placeholder, engine_command_id, projection_seq
1192 ) VALUES ('plan-1', $1, 'ETH', 'Placeholder', true, 11, 12)",
1193 )
1194 .bind::<Bytea, _>(wallet.as_bytes())
1195 .execute(&mut conn)
1196 .unwrap();
1197
1198 diesel::sql_query(
1199 "INSERT INTO pm_recovery_actions (
1200 plan_id, action_id, action_type, status, engine_command_id, projection_seq
1201 ) VALUES ('plan-1', 'action-1', 'Placeholder', 'Pending', 11, 12)",
1202 )
1203 .execute(&mut conn)
1204 .unwrap();
1205
1206 let pools = db.list_pm_settlement_pools_sync().unwrap();
1207 assert_eq!(pools.len(), 1);
1208 assert_eq!(pools[0].pool_available_usdc, dec!(1000));
1209 assert_eq!(pools[0].pool_capacity_usdc, dec!(1200));
1210 let accounts = db.list_pm_settlement_accounts_sync().unwrap();
1211 assert_eq!(accounts[0].wallet, wallet);
1212 assert_eq!(accounts[0].timing_bridge_principal_usdc, dec!(75));
1213 assert_eq!(accounts[0].bridge_deadline_ms, Some(200));
1214 assert_eq!(
1215 accounts[0].active_recovery_plan_id.as_deref(),
1216 Some("plan-1")
1217 );
1218 assert_eq!(db.list_pm_settlement_events_sync().unwrap().len(), 1);
1219 assert_eq!(
1220 db.list_pm_settlement_interest_events_sync().unwrap()[0].request_id,
1221 interest_request_id
1222 );
1223 assert_eq!(
1224 db.list_pm_settlement_interest_events_sync().unwrap()[0].interest_usdc,
1225 dec!(3)
1226 );
1227 assert_eq!(
1228 db.list_pm_settlement_repayment_events_sync().unwrap()[0].request_id,
1229 repayment_request_id
1230 );
1231 assert_eq!(
1232 db.list_pm_settlement_repayment_events_sync().unwrap()[0].principal_paid_usdc,
1233 dec!(7)
1234 );
1235 assert_eq!(db.list_pm_recovery_plans_sync().unwrap().len(), 1);
1236 assert_eq!(db.list_pm_recovery_actions_sync().unwrap().len(), 1);
1237
1238 let async_db = test_db.diesel_db().await;
1239 assert_eq!(async_db.list_pm_settlement_pools().await.unwrap().len(), 1);
1240 assert_eq!(
1241 async_db.list_pm_settlement_accounts().await.unwrap()[0].wallet,
1242 wallet
1243 );
1244 assert_eq!(async_db.list_pm_settlement_events().await.unwrap().len(), 1);
1245 assert_eq!(
1246 async_db.list_pm_settlement_interest_events().await.unwrap()[0].interest_usdc,
1247 dec!(3)
1248 );
1249 assert_eq!(
1250 async_db
1251 .list_pm_settlement_repayment_events()
1252 .await
1253 .unwrap()[0]
1254 .principal_paid_usdc,
1255 dec!(7)
1256 );
1257 assert_eq!(async_db.list_pm_recovery_plans().await.unwrap().len(), 1);
1258 assert_eq!(async_db.list_pm_recovery_actions().await.unwrap().len(), 1);
1259 }
1260
1261 #[tokio::test]
1262 async fn pm_settlement_gate_counts_are_not_display_list_limited() {
1263 let test_db = TestDb::new().await.unwrap();
1264 let db = test_db.handler.as_ref();
1265 let mut conn = db.pool().get().unwrap();
1266
1267 diesel::sql_query(
1268 "INSERT INTO pm_settlement_pools (
1269 underlying, config_version, policy_version, pool_available_usdc,
1270 pool_target_usdc, pool_capacity_usdc, pool_utilization,
1271 active_timing_bridge_usdc, active_settlement_debt_usdc,
1272 normal_utilization_cap, crisis_utilization_cap,
1273 last_engine_command_id, projection_seq
1274 )
1275 SELECT
1276 'POOL' || gs::text,
1277 1,
1278 1,
1279 1,
1280 2,
1281 10,
1282 0.96,
1283 0,
1284 0,
1285 0.80,
1286 0.95,
1287 gs,
1288 gs
1289 FROM generate_series(1, 1001) AS gs",
1290 )
1291 .execute(&mut conn)
1292 .unwrap();
1293
1294 diesel::sql_query(
1295 "INSERT INTO pm_settlement_accounts (
1296 wallet, underlying, status, timing_bridge_principal_usdc,
1297 settlement_debt_principal_usdc, accrued_interest_usdc,
1298 interest_cursor_ms, bridge_deadline_ms, active_recovery_plan_id,
1299 policy_version, last_engine_command_id, projection_seq
1300 )
1301 SELECT
1302 decode(lpad(to_hex(gs), 40, '0'), 'hex'),
1303 'ETH',
1304 'Debt',
1305 1,
1306 2,
1307 0,
1308 100,
1309 200,
1310 'plan-' || gs::text,
1311 1,
1312 gs,
1313 gs
1314 FROM generate_series(1, 1001) AS gs",
1315 )
1316 .execute(&mut conn)
1317 .unwrap();
1318
1319 diesel::sql_query(
1320 "INSERT INTO pm_recovery_plans (
1321 plan_id, wallet, underlying, status, placeholder, engine_command_id, projection_seq
1322 )
1323 SELECT
1324 'plan-' || gs::text,
1325 decode(lpad(to_hex(gs), 40, '0'), 'hex'),
1326 'ETH',
1327 'Planned',
1328 false,
1329 gs,
1330 gs
1331 FROM generate_series(1, 1001) AS gs",
1332 )
1333 .execute(&mut conn)
1334 .unwrap();
1335
1336 diesel::sql_query(
1337 "INSERT INTO pm_recovery_actions (
1338 plan_id, action_id, action_type, status, engine_command_id, projection_seq
1339 )
1340 SELECT
1341 'plan-' || gs::text,
1342 'action-' || gs::text,
1343 'Transfer',
1344 'Submitted',
1345 gs,
1346 gs
1347 FROM generate_series(1, 1001) AS gs",
1348 )
1349 .execute(&mut conn)
1350 .unwrap();
1351
1352 let async_db = test_db.diesel_db().await;
1353 assert_eq!(
1354 async_db.list_pm_settlement_pools().await.unwrap().len(),
1355 1000
1356 );
1357 assert_eq!(
1358 async_db.list_pm_settlement_accounts().await.unwrap().len(),
1359 1000
1360 );
1361 assert_eq!(
1362 async_db.list_pm_recovery_actions().await.unwrap().len(),
1363 1000
1364 );
1365
1366 let pool_counts = async_db.pm_settlement_pool_gate_counts().await.unwrap();
1367 assert_eq!(pool_counts.total_pools, 1001);
1368 assert_eq!(pool_counts.below_target_pools, 1001);
1369 assert_eq!(pool_counts.above_crisis_cap_pools, 1001);
1370 assert_eq!(pool_counts.missing_utilization_pools, 0);
1371 assert_eq!(pool_counts.missing_crisis_cap_pools, 0);
1372
1373 let account_counts = async_db
1374 .pm_settlement_account_gate_counts(300)
1375 .await
1376 .unwrap();
1377 assert_eq!(account_counts.total_accounts, 1001);
1378 assert_eq!(account_counts.bridged_accounts, 1001);
1379 assert_eq!(account_counts.debt_accounts, 1001);
1380 assert_eq!(account_counts.overdue_bridge_accounts, 1001);
1381 assert_eq!(account_counts.active_recovery_accounts, 1001);
1382 assert_eq!(account_counts.active_bridge_usdc, dec!(1001));
1383 assert_eq!(account_counts.active_debt_usdc, dec!(2002));
1384
1385 let action_counts = async_db.pm_recovery_action_gate_counts().await.unwrap();
1386 assert_eq!(action_counts.total_actions, 1001);
1387 assert_eq!(action_counts.submitted_actions, 1001);
1388 }
1389
1390 #[tokio::test]
1391 async fn pm_settlement_gate_counts_fail_closed_for_unknown_bridge_deadlines() {
1392 let test_db = TestDb::new().await.unwrap();
1393 let db = test_db.handler.as_ref();
1394 let mut conn = db.pool().get().unwrap();
1395 let wallet = hypercall_types::wallet_address::test_wallet(170);
1396
1397 diesel::sql_query(
1398 "INSERT INTO pm_settlement_accounts (
1399 wallet, underlying, status, timing_bridge_principal_usdc,
1400 settlement_debt_principal_usdc, accrued_interest_usdc,
1401 interest_cursor_ms, bridge_deadline_ms, active_recovery_plan_id,
1402 policy_version, last_engine_command_id, projection_seq
1403 ) VALUES ($1, 'ETH', 'Bridged', $2, 0, 0, 100, NULL, NULL, 1, 1, 1)",
1404 )
1405 .bind::<Bytea, _>(wallet.as_bytes())
1406 .bind::<Numeric, _>(dec!(10))
1407 .execute(&mut conn)
1408 .unwrap();
1409
1410 let async_db = test_db.diesel_db().await;
1411 let account_counts = async_db
1412 .pm_settlement_account_gate_counts(300)
1413 .await
1414 .unwrap();
1415 assert_eq!(account_counts.bridged_accounts, 1);
1416 assert_eq!(account_counts.overdue_bridge_accounts, 1);
1417 }
1418
1419 #[tokio::test]
1420 async fn pm_settlement_gate_counts_ignore_repaid_recovery_markers() {
1421 let test_db = TestDb::new().await.unwrap();
1422 let db = test_db.handler.as_ref();
1423 let mut conn = db.pool().get().unwrap();
1424 let active_wallet = hypercall_types::wallet_address::test_wallet(171);
1425 let repaid_wallet = hypercall_types::wallet_address::test_wallet(172);
1426
1427 diesel::sql_query(
1428 "INSERT INTO pm_settlement_accounts (
1429 wallet, underlying, status, timing_bridge_principal_usdc,
1430 settlement_debt_principal_usdc, accrued_interest_usdc,
1431 interest_cursor_ms, bridge_deadline_ms, active_recovery_plan_id,
1432 policy_version, last_engine_command_id, projection_seq
1433 ) VALUES
1434 ($1, 'ETH', 'Debt', 0, $2, 0, 100, NULL, 'active-plan', 1, 1, 1),
1435 ($3, 'ETH', 'Current', 0, 0, 0, 100, NULL, 'repaid-plan', 1, 2, 2)",
1436 )
1437 .bind::<Bytea, _>(active_wallet.as_bytes())
1438 .bind::<Numeric, _>(dec!(10))
1439 .bind::<Bytea, _>(repaid_wallet.as_bytes())
1440 .execute(&mut conn)
1441 .unwrap();
1442
1443 let async_db = test_db.diesel_db().await;
1444 let account_counts = async_db
1445 .pm_settlement_account_gate_counts(300)
1446 .await
1447 .unwrap();
1448 assert_eq!(account_counts.total_accounts, 2);
1449 assert_eq!(account_counts.debt_accounts, 1);
1450 assert_eq!(account_counts.active_recovery_accounts, 1);
1451 }
1452
1453 #[tokio::test]
1454 async fn pm_settlement_projection_writer_is_idempotent_and_transactional() {
1455 let test_db = TestDb::new().await.unwrap();
1456 let db = test_db.handler.as_ref();
1457 let wallet = hypercall_types::wallet_address::test_wallet(169);
1458 let interest_request_id = uuid::Uuid::new_v4();
1459
1460 let pool_write = hypercall_db::PmSettlementProjectionWrite::Pool(
1461 hypercall_db::PmSettlementPoolProjectionWrite {
1462 underlying: "ETH".to_string(),
1463 config_version: 1,
1464 policy_version: 1,
1465 pool_available_usdc: dec!(1000),
1466 pool_target_usdc: dec!(1500),
1467 pool_capacity_usdc: dec!(1100),
1468 pool_utilization: Some(dec!(0.10)),
1469 active_timing_bridge_usdc: dec!(100),
1470 active_settlement_debt_usdc: dec!(0),
1471 target_short_oi_notional_multiplier: Some(dec!(0.20)),
1472 utilization_kink: Some(dec!(0.60)),
1473 apr_at_kink: Some(dec!(0.04)),
1474 max_apr: Some(dec!(4.00)),
1475 normal_utilization_cap: Some(dec!(0.80)),
1476 crisis_utilization_cap: Some(dec!(0.95)),
1477 bridge_window_ms: Some(3_600_000),
1478 projection_seq: 100,
1479 updated_at_ms: 100,
1480 },
1481 );
1482 let account_write = hypercall_db::PmSettlementProjectionWrite::Account(
1483 hypercall_db::PmSettlementAccountProjectionWrite {
1484 wallet,
1485 underlying: "ETH".to_string(),
1486 status: "Bridged".to_string(),
1487 timing_bridge_principal_usdc: dec!(50),
1488 settlement_debt_principal_usdc: dec!(0),
1489 accrued_interest_usdc: dec!(2),
1490 interest_cursor_ms: 100,
1491 bridge_deadline_ms: Some(200),
1492 active_recovery_plan_id: Some("plan-169".to_string()),
1493 policy_version: 1,
1494 projection_seq: 100,
1495 updated_at_ms: 100,
1496 },
1497 );
1498 let interest_write = hypercall_db::PmSettlementProjectionWrite::InterestEvent(
1499 hypercall_db::PmSettlementInterestEventProjectionWrite {
1500 request_id: interest_request_id,
1501 wallet,
1502 underlying: "ETH".to_string(),
1503 from_ms: 100,
1504 to_ms: 200,
1505 utilization: dec!(0.10),
1506 apr: dec!(0.04),
1507 interest_usdc: dec!(2),
1508 policy_version: 1,
1509 },
1510 );
1511 let recovery_write = hypercall_db::PmSettlementProjectionWrite::RecoveryPlan(
1512 hypercall_db::PmRecoveryPlanProjectionWrite {
1513 plan_id: "plan-169".to_string(),
1514 wallet,
1515 underlying: "ETH".to_string(),
1516 status: "Confirmed".to_string(),
1517 trigger: "UtilizationBreach".to_string(),
1518 reason: "PoolUtilizationBreach".to_string(),
1519 policy_version: 1,
1520 recovery_priority_version: 1,
1521 target_reduction_usdc: dec!(10),
1522 expected_usdc_recovered: dec!(10),
1523 expected_obligation_reduced: dec!(0),
1524 expected_impact_usdc: dec!(1),
1525 post_plan_utilization: Some(dec!(0.09)),
1526 projection_seq: 300,
1527 updated_at_ms: 300,
1528 actions: vec![hypercall_db::PmRecoveryActionProjectionWrite {
1529 plan_id: "plan-169".to_string(),
1530 action_id: "plan-169:0".to_string(),
1531 action_type: "CancelOrder".to_string(),
1532 status: "Confirmed".to_string(),
1533 target: "order-1".to_string(),
1534 attempt: 1,
1535 external_id: Some("directive-1".to_string()),
1536 external_kind: Some("Directive".to_string()),
1537 result_external_id: Some("fill-1".to_string()),
1538 result: Some("Confirmed".to_string()),
1539 expected_usdc_recovered: dec!(10),
1540 expected_obligation_reduced: dec!(0),
1541 expected_impact_usdc: dec!(1),
1542 recovered_usdc: dec!(10),
1543 liability_reduction_usdc: dec!(0),
1544 projection_seq: 300,
1545 updated_at_ms: 300,
1546 }],
1547 },
1548 );
1549 let deposit_id = uuid::Uuid::new_v4();
1550 let withdrawal_id = uuid::Uuid::new_v4();
1551 let vault_deposit_write = hypercall_db::PmSettlementProjectionWrite::VaultDeposit(
1552 hypercall_db::PmVaultDepositProjectionWrite {
1553 deposit_id,
1554 depositor: wallet,
1555 underlying: "ETH".to_string(),
1556 principal_usdc: dec!(500),
1557 remaining_usdc: dec!(400),
1558 withdrawn_usdc: dec!(0),
1559 reserved_withdrawal_usdc: dec!(100),
1560 chain_id: 42161,
1561 source_contract_address: wallet,
1562 tx_hash: "0xdeposit".to_string(),
1563 log_index: 7,
1564 max_listed_expiry_ts_ms: 1_800_000_000_000,
1565 settlement_grace_ms: 86_400_000,
1566 lock_until_ms: 1_800_086_400_000,
1567 status: hypercall_db::PmVaultDepositProjectionStatus::PartiallyReserved,
1568 projection_seq: 400,
1569 created_at_ms: 300,
1570 updated_at_ms: 400,
1571 },
1572 );
1573 let vault_withdrawal_write = hypercall_db::PmSettlementProjectionWrite::VaultWithdrawal(
1574 hypercall_db::PmVaultWithdrawalProjectionWrite {
1575 withdrawal_id,
1576 deposit_id,
1577 depositor: wallet,
1578 underlying: "ETH".to_string(),
1579 amount_usdc: dec!(100),
1580 lock_until_ms: 1_800_086_400_000,
1581 status: hypercall_db::PmVaultWithdrawalProjectionStatus::Reserved,
1582 projection_seq: 400,
1583 requested_at_ms: 1_800_086_400_000,
1584 updated_at_ms: 1_800_086_400_000,
1585 },
1586 );
1587
1588 db.apply_pm_settlement_projection_writes_sync(&[
1589 pool_write.clone(),
1590 account_write.clone(),
1591 interest_write.clone(),
1592 recovery_write.clone(),
1593 vault_deposit_write.clone(),
1594 vault_withdrawal_write.clone(),
1595 ])
1596 .unwrap();
1597 db.apply_pm_settlement_projection_writes_sync(&[
1598 pool_write.clone(),
1599 account_write.clone(),
1600 interest_write,
1601 recovery_write,
1602 vault_deposit_write,
1603 vault_withdrawal_write,
1604 ])
1605 .unwrap();
1606
1607 assert_eq!(db.list_pm_settlement_pools_sync().unwrap().len(), 1);
1608 let accounts = db.list_pm_settlement_accounts_sync().unwrap();
1609 assert_eq!(accounts.len(), 1);
1610 assert_eq!(accounts[0].bridge_deadline_ms, Some(200));
1611 assert_eq!(
1612 accounts[0].active_recovery_plan_id.as_deref(),
1613 Some("plan-169")
1614 );
1615
1616 let stale_account_write = hypercall_db::PmSettlementProjectionWrite::Account(
1617 hypercall_db::PmSettlementAccountProjectionWrite {
1618 wallet,
1619 underlying: "ETH".to_string(),
1620 status: "Debt".to_string(),
1621 timing_bridge_principal_usdc: dec!(0),
1622 settlement_debt_principal_usdc: dec!(999),
1623 accrued_interest_usdc: dec!(999),
1624 interest_cursor_ms: 1,
1625 bridge_deadline_ms: None,
1626 active_recovery_plan_id: None,
1627 policy_version: 1,
1628 projection_seq: 99,
1629 updated_at_ms: 99,
1630 },
1631 );
1632 db.apply_pm_settlement_projection_writes_sync(&[stale_account_write])
1633 .unwrap();
1634 let accounts = db.list_pm_settlement_accounts_sync().unwrap();
1635 assert_eq!(accounts[0].status, "Bridged");
1636 assert_eq!(accounts[0].timing_bridge_principal_usdc, dec!(50));
1637 assert_eq!(accounts[0].settlement_debt_principal_usdc, dec!(0));
1638 assert_eq!(accounts[0].accrued_interest_usdc, dec!(2));
1639 assert_eq!(accounts[0].interest_cursor_ms, 100);
1640 assert_eq!(accounts[0].bridge_deadline_ms, Some(200));
1641 assert_eq!(
1642 accounts[0].active_recovery_plan_id.as_deref(),
1643 Some("plan-169")
1644 );
1645 assert_eq!(accounts[0].projection_seq, 100);
1646
1647 let stale_pool_write = hypercall_db::PmSettlementProjectionWrite::Pool(
1648 hypercall_db::PmSettlementPoolProjectionWrite {
1649 underlying: "ETH".to_string(),
1650 config_version: 1,
1651 policy_version: 1,
1652 pool_available_usdc: dec!(999),
1653 pool_target_usdc: dec!(999),
1654 pool_capacity_usdc: dec!(999),
1655 pool_utilization: Some(dec!(0.99)),
1656 active_timing_bridge_usdc: dec!(999),
1657 active_settlement_debt_usdc: dec!(999),
1658 target_short_oi_notional_multiplier: Some(dec!(0.99)),
1659 utilization_kink: Some(dec!(0.99)),
1660 apr_at_kink: Some(dec!(0.99)),
1661 max_apr: Some(dec!(9.99)),
1662 normal_utilization_cap: Some(dec!(0.99)),
1663 crisis_utilization_cap: Some(dec!(0.99)),
1664 bridge_window_ms: Some(99),
1665 projection_seq: 99,
1666 updated_at_ms: 99,
1667 },
1668 );
1669 db.apply_pm_settlement_projection_writes_sync(&[stale_pool_write])
1670 .unwrap();
1671 let pools = db.list_pm_settlement_pools_sync().unwrap();
1672 assert_eq!(pools[0].pool_available_usdc, dec!(1000));
1673 assert_eq!(pools[0].pool_target_usdc, dec!(1500));
1674 assert_eq!(pools[0].pool_capacity_usdc, dec!(1100));
1675 assert_eq!(pools[0].pool_utilization, Some(dec!(0.10)));
1676 assert_eq!(pools[0].active_timing_bridge_usdc, dec!(100));
1677 assert_eq!(pools[0].projection_seq, 100);
1678
1679 let interest_events = db.list_pm_settlement_interest_events_sync().unwrap();
1680 assert_eq!(interest_events.len(), 1);
1681 assert_eq!(interest_events[0].request_id, interest_request_id);
1682 let recovery_actions = db.list_pm_recovery_actions_sync().unwrap();
1683 assert_eq!(recovery_actions.len(), 1);
1684 assert_eq!(
1685 recovery_actions[0].result_external_id.as_deref(),
1686 Some("fill-1")
1687 );
1688 #[derive(QueryableByName)]
1689 struct VaultProjectionCounts {
1690 #[diesel(sql_type = BigInt)]
1691 deposits: i64,
1692 #[diesel(sql_type = BigInt)]
1693 withdrawals: i64,
1694 }
1695 #[derive(QueryableByName)]
1696 struct VaultDepositProjectionRow {
1697 #[diesel(sql_type = Numeric)]
1698 remaining_usdc: rust_decimal::Decimal,
1699 #[diesel(sql_type = Numeric)]
1700 withdrawn_usdc: rust_decimal::Decimal,
1701 #[diesel(sql_type = Numeric)]
1702 reserved_withdrawal_usdc: rust_decimal::Decimal,
1703 #[diesel(sql_type = Text)]
1704 status: String,
1705 #[diesel(sql_type = BigInt)]
1706 projection_seq: i64,
1707 }
1708 let mut conn = db.pool().get().unwrap();
1709 let vault_counts: VaultProjectionCounts = diesel::sql_query(
1710 "SELECT
1711 (SELECT COUNT(*) FROM pm_vault_deposits)::bigint AS deposits,
1712 (SELECT COUNT(*) FROM pm_vault_withdrawals)::bigint AS withdrawals",
1713 )
1714 .get_result(&mut conn)
1715 .unwrap();
1716 assert_eq!(vault_counts.deposits, 1);
1717 assert_eq!(vault_counts.withdrawals, 1);
1718
1719 let stale_vault_deposit_write = hypercall_db::PmSettlementProjectionWrite::VaultDeposit(
1720 hypercall_db::PmVaultDepositProjectionWrite {
1721 deposit_id,
1722 depositor: wallet,
1723 underlying: "ETH".to_string(),
1724 principal_usdc: dec!(500),
1725 remaining_usdc: dec!(0),
1726 withdrawn_usdc: dec!(500),
1727 reserved_withdrawal_usdc: dec!(0),
1728 chain_id: 42161,
1729 source_contract_address: wallet,
1730 tx_hash: "0xdeposit".to_string(),
1731 log_index: 7,
1732 max_listed_expiry_ts_ms: 1_800_000_000_000,
1733 settlement_grace_ms: 86_400_000,
1734 lock_until_ms: 1_800_086_400_000,
1735 status: hypercall_db::PmVaultDepositProjectionStatus::Reserved,
1736 projection_seq: 399,
1737 created_at_ms: 300,
1738 updated_at_ms: 399,
1739 },
1740 );
1741 db.apply_pm_settlement_projection_writes_sync(&[stale_vault_deposit_write])
1742 .unwrap();
1743 let stale_checked_deposit: VaultDepositProjectionRow = diesel::sql_query(
1744 "SELECT
1745 remaining_usdc,
1746 withdrawn_usdc,
1747 reserved_withdrawal_usdc,
1748 status::text AS status,
1749 projection_seq
1750 FROM pm_vault_deposits
1751 WHERE deposit_id = $1",
1752 )
1753 .bind::<SqlUuid, _>(DbUuid(deposit_id))
1754 .get_result(&mut conn)
1755 .unwrap();
1756 assert_eq!(stale_checked_deposit.remaining_usdc, dec!(400));
1757 assert_eq!(stale_checked_deposit.withdrawn_usdc, dec!(0));
1758 assert_eq!(stale_checked_deposit.reserved_withdrawal_usdc, dec!(100));
1759 assert_eq!(stale_checked_deposit.status, "PartiallyReserved");
1760 assert_eq!(stale_checked_deposit.projection_seq, 400);
1761
1762 let invalid_repayment = hypercall_db::PmSettlementProjectionWrite::RepaymentEvent(
1763 hypercall_db::PmSettlementRepaymentEventProjectionWrite {
1764 request_id: uuid::Uuid::new_v4(),
1765 wallet,
1766 underlying: "BTC".to_string(),
1767 amount_usdc: dec!(-1),
1768 interest_paid_usdc: dec!(0),
1769 principal_paid_usdc: dec!(0),
1770 reason: "manual".to_string(),
1771 source_event_id: "event-rollback".to_string(),
1772 },
1773 );
1774 let rollback_pool = hypercall_db::PmSettlementProjectionWrite::Pool(
1775 hypercall_db::PmSettlementPoolProjectionWrite {
1776 underlying: "BTC".to_string(),
1777 config_version: 1,
1778 policy_version: 1,
1779 pool_available_usdc: dec!(100),
1780 pool_target_usdc: dec!(100),
1781 pool_capacity_usdc: dec!(100),
1782 pool_utilization: Some(dec!(0)),
1783 active_timing_bridge_usdc: dec!(0),
1784 active_settlement_debt_usdc: dec!(0),
1785 target_short_oi_notional_multiplier: Some(dec!(0.20)),
1786 utilization_kink: Some(dec!(0.60)),
1787 apr_at_kink: Some(dec!(0.04)),
1788 max_apr: Some(dec!(4.00)),
1789 normal_utilization_cap: Some(dec!(0.80)),
1790 crisis_utilization_cap: Some(dec!(0.95)),
1791 bridge_window_ms: Some(3_600_000),
1792 projection_seq: 200,
1793 updated_at_ms: 200,
1794 },
1795 );
1796
1797 let result =
1798 db.apply_pm_settlement_projection_writes_sync(&[rollback_pool, invalid_repayment]);
1799 assert!(result.is_err());
1800 let pools = db.list_pm_settlement_pools_sync().unwrap();
1801 assert_eq!(pools.len(), 1);
1802 assert_eq!(pools[0].underlying, "ETH");
1803 assert!(db
1804 .list_pm_settlement_repayment_events_sync()
1805 .unwrap()
1806 .is_empty());
1807 }
1808}