Skip to main content

hypercall/rsm/portfolio_margin/
settlement_state.rs

1use hypercall_engine::command::{
2    AccruePmSettlementInterestCommand, ApplyPmSettlementRepaymentCommand,
3    JournalPmRecoveryPlanCommand, MarkPmRecoveryActionSubmittedCommand, PmRecoveryActionKind,
4    PmRecoveryActionResult, PmRecoveryReason, PmSettlementEventKey, RecordPmVaultDepositCommand,
5    RequestPmVaultWithdrawalCommand, ResolvePmRecoveryActionCommand,
6    SetPmSettlementPoolConfigCommand, TickExpiryPmSettlement,
7};
8use hypercall_margin::portfolio::{
9    classify_liquidity_gap, pool_capacity_usdc, pool_utilization, utilization_apr,
10    validate_pool_config, PmLiquidityClassification, PmSettlementPoolConfig,
11    PmSettlementPoolSnapshot,
12};
13use hypercall_types::WalletAddress;
14use rust_decimal::Decimal;
15use serde::{Deserialize, Serialize};
16use std::collections::BTreeMap;
17use uuid::Uuid;
18
19const INTEREST_YEAR_MS: u64 = 365 * 24 * 60 * 60 * 1000;
20
21#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
22pub struct PmSettlementState {
23    pub pools: BTreeMap<String, PmSettlementPoolState>,
24    pub accounts: BTreeMap<PmSettlementAccountKey, PmSettlementAccountState>,
25    /// Durable classification history retained for replay idempotency.
26    ///
27    /// TODO: move this out of serialized engine snapshot state. Cardinality
28    /// grows with the historical settlement cross-product of PM wallets,
29    /// expiring markets, expiry timestamps, margin modes, and settlement event
30    /// sequences. The long-term shape should keep only active settlement state
31    /// in `PmSettlementState` and put historical event/request dedupe behind a
32    /// bounded cursor, a gap-buffered stream index, or an engine-owned durable
33    /// KV store such as RocksDB.
34    pub events: BTreeMap<PmSettlementEventKey, PmSettlementEventState>,
35    /// Request-id dedupe for PM settlement commands.
36    ///
37    /// TODO: remove this from snapshot state with `events`. It grows with
38    /// historical command count rather than active accounts, so it should become
39    /// a bounded replay-protection index instead of an in-memory audit log.
40    pub idempotency: BTreeMap<String, PmSettlementRequestRecord>,
41    #[serde(default)]
42    pub vault_deposits: BTreeMap<Uuid, PmVaultDepositState>,
43    #[serde(default)]
44    pub vault_withdrawals: BTreeMap<Uuid, PmVaultWithdrawalState>,
45    // CALL-1900: AWS staging replay bridge for snapshots written before PM
46    // recovery plans were appended. Remove after staging boots once, writes a
47    // current snapshot, and restarts without unsafe snapshot replay.
48    #[serde(default)]
49    pub recovery_plans: BTreeMap<String, PmRecoveryPlanState>,
50}
51
52#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
53pub struct PmSettlementAccountKey {
54    pub wallet: WalletAddress,
55    pub underlying: String,
56}
57
58#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
59pub struct PmSettlementPoolState {
60    pub underlying: String,
61    pub pool_available_usdc: Decimal,
62    pub pool_target_usdc: Decimal,
63    pub active_timing_bridge_usdc: Decimal,
64    pub active_settlement_debt_usdc: Decimal,
65    pub config_version: u32,
66    pub policy_version: u32,
67    pub config: Option<PmSettlementPoolConfig>,
68    pub utilization: Option<Decimal>,
69    pub updated_at_ms: u64,
70}
71
72#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
73pub enum PmSettlementAccountStatus {
74    Clean,
75    Bridged,
76    Debt,
77    Resolved,
78}
79
80#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
81pub struct PmSettlementAccountState {
82    pub wallet: WalletAddress,
83    pub underlying: String,
84    pub bridge_principal_usdc: Decimal,
85    pub debt_principal_usdc: Decimal,
86    pub accrued_interest_usdc: Decimal,
87    pub last_interest_accrual_ms: i64,
88    pub policy_version: u32,
89    pub bridge_deadline_ms: Option<i64>,
90    pub status: PmSettlementAccountStatus,
91    pub active_recovery_plan_id: Option<String>,
92    pub updated_at_ms: u64,
93}
94
95#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
96pub struct PmSettlementEventState {
97    pub event_key: PmSettlementEventKey,
98    pub input_digest: String,
99    pub output_digest: String,
100    pub status: String,
101    pub request_id: Uuid,
102    pub underlying: String,
103    pub event_type: String,
104    pub amount_usdc: Decimal,
105}
106
107#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
108pub struct PmSettlementRequestRecord {
109    pub request_id: Uuid,
110    pub input_digest: String,
111    pub output_digest: String,
112    pub command_type: String,
113    pub applied_at_ms: u64,
114}
115
116#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
117pub struct PmVaultDepositState {
118    pub deposit_id: Uuid,
119    pub depositor: WalletAddress,
120    pub underlying: String,
121    pub principal_usdc: Decimal,
122    pub remaining_usdc: Decimal,
123    pub withdrawn_usdc: Decimal,
124    pub reserved_withdrawal_usdc: Decimal,
125    pub chain_id: u64,
126    pub source_contract_address: WalletAddress,
127    pub tx_hash: String,
128    pub log_index: u32,
129    pub max_listed_expiry_ts_ms: i64,
130    pub settlement_grace_ms: i64,
131    pub lock_until_ms: i64,
132    pub status: PmVaultDepositStatus,
133    pub created_at_ms: u64,
134    pub updated_at_ms: u64,
135}
136
137#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
138pub enum PmVaultDepositStatus {
139    Active,
140    PartiallyReserved,
141    Reserved,
142}
143
144#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
145pub struct PmVaultWithdrawalState {
146    pub withdrawal_id: Uuid,
147    pub deposit_id: Uuid,
148    pub depositor: WalletAddress,
149    pub underlying: String,
150    pub amount_usdc: Decimal,
151    pub lock_until_ms: i64,
152    pub status: PmVaultWithdrawalStatus,
153    pub requested_at_ms: u64,
154    pub updated_at_ms: u64,
155}
156
157#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
158pub enum PmVaultWithdrawalStatus {
159    Reserved,
160}
161
162#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
163pub struct PmRecoveryPlanState {
164    pub plan_id: String,
165    pub wallet: WalletAddress,
166    pub underlying: String,
167    pub trigger: String,
168    pub reason: String,
169    pub policy_version: u32,
170    pub recovery_priority_version: u32,
171    pub status: String,
172    pub input_digest: String,
173    pub target_reduction_usdc: Decimal,
174    pub expected_usdc_recovered: Decimal,
175    pub expected_obligation_reduced: Decimal,
176    pub expected_impact_usdc: Decimal,
177    pub post_plan_utilization: Option<Decimal>,
178    pub actions: Vec<PmRecoveryActionState>,
179    pub updated_at_ms: u64,
180}
181
182#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
183pub struct PmRecoveryActionState {
184    pub action_index: u32,
185    pub action_type: String,
186    pub target: String,
187    pub action_payload: String,
188    pub status: String,
189    pub expected_usdc_recovered: Decimal,
190    pub expected_obligation_reduced: Decimal,
191    pub expected_impact_usdc: Decimal,
192    pub attempt: u32,
193    pub submitted_external_id: Option<String>,
194    pub external_kind: Option<String>,
195    pub result: Option<String>,
196    pub result_external_id: Option<String>,
197    pub recovered_usdc: Decimal,
198    pub liability_reduction_usdc: Decimal,
199    pub updated_at_ms: u64,
200}
201
202#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
203pub enum PmSettlementProjectionEffect {
204    PoolUpsert(PmSettlementPoolState),
205    AccountUpsert(PmSettlementAccountState),
206    InterestEvent(PmSettlementInterestEvent),
207    RepaymentEvent(PmSettlementRepaymentEvent),
208    VaultDepositUpsert(PmVaultDepositState),
209    VaultWithdrawalUpsert(PmVaultWithdrawalState),
210    EventUpsert(PmSettlementEventState),
211    RecoveryPlanUpsert(PmRecoveryPlanState),
212}
213
214#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
215pub struct PmSettlementInterestEvent {
216    pub request_id: Uuid,
217    pub wallet: WalletAddress,
218    pub underlying: String,
219    pub accrual_start_ms: i64,
220    pub accrual_end_ms: i64,
221    pub utilization: Decimal,
222    pub apr: Decimal,
223    pub interest_amount_usdc: Decimal,
224    pub policy_version: u32,
225}
226
227#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
228pub struct PmSettlementRepaymentEvent {
229    pub request_id: Uuid,
230    pub wallet: WalletAddress,
231    pub underlying: String,
232    pub source_event_id: String,
233    pub amount_usdc: Decimal,
234    pub interest_paid_usdc: Decimal,
235    pub principal_paid_usdc: Decimal,
236    pub pool_replenished_usdc: Decimal,
237    pub reason: String,
238    pub event_ts_ms: u64,
239}
240
241impl PmSettlementState {
242    pub fn apply_classify_settlement(
243        &mut self,
244        command: TickExpiryPmSettlement,
245        timestamp_ms: u64,
246    ) -> Result<Vec<PmSettlementProjectionEffect>, String> {
247        validate_request_identity(
248            command.request_id,
249            &command.facts_digest,
250            "ClassifyPmSettlement",
251        )?;
252        if let Some(existing) = self.idempotent_duplicate(
253            command.request_id,
254            &command.facts_digest,
255            "ClassifyPmSettlement",
256        )? {
257            return Ok(existing);
258        }
259        if let Some(existing) = self.events.get(&command.event_key) {
260            if existing.input_digest != command.facts_digest {
261                return Err(format!(
262                    "PM settlement event digest mismatch for {}",
263                    format_event_key(&command.event_key)
264                ));
265            }
266            self.record_request(
267                command.request_id,
268                command.facts_digest,
269                "ClassifyPmSettlement",
270                timestamp_ms,
271            );
272            return Ok(Vec::new());
273        }
274
275        let classification = match command.unavailable_reason.as_ref() {
276            Some(_) => PmLiquidityClassification::Unavailable,
277            None => {
278                let facts = command.pm_facts.as_ref().ok_or_else(|| {
279                    "ClassifyPmSettlement missing PM facts without unavailable reason".to_string()
280                })?;
281                if facts.wallet != command.wallet {
282                    return Err(
283                        "ClassifyPmSettlement PM facts wallet mismatch for settlement command"
284                            .to_string(),
285                    );
286                }
287                if facts.underlying != command.underlying {
288                    return Err(
289                        "ClassifyPmSettlement PM facts underlying mismatch for settlement command"
290                            .to_string(),
291                    );
292                }
293                if facts.liquid_usdc != command.liquid_usdc {
294                    return Err(
295                        "ClassifyPmSettlement liquid_usdc mismatch between command and PM facts"
296                            .to_string(),
297                    );
298                }
299                let pool_snapshot = command.pool_snapshot.as_ref().ok_or_else(|| {
300                    "ClassifyPmSettlement missing pool snapshot without unavailable reason"
301                        .to_string()
302                })?;
303                let pool = self
304                    .pools
305                    .get(&command.underlying)
306                    .ok_or_else(|| format!("unknown PM settlement pool {}", command.underlying))?;
307                if pool.snapshot() != *pool_snapshot {
308                    return Err(format!(
309                        "PM settlement pool snapshot mismatch for {}",
310                        command.underlying
311                    ));
312                }
313                let config = pool.config.as_ref().ok_or_else(|| {
314                    format!("missing PM settlement config {}", command.underlying)
315                })?;
316                classify_liquidity_gap(&command.obligation, facts, pool_snapshot, config)
317                    .map_err(|error| error.to_string())?
318            }
319        };
320
321        let shortfall =
322            (command.settlement_obligation_usdc - command.liquid_usdc).max(Decimal::ZERO);
323        let mut effects = Vec::new();
324        let event = PmSettlementEventState {
325            event_key: command.event_key.clone(),
326            input_digest: command.facts_digest.clone(),
327            output_digest: format!("{classification:?}"),
328            status: format!("{classification:?}"),
329            request_id: command.request_id,
330            underlying: command.underlying.clone(),
331            event_type: "ClassifyPmSettlement".to_string(),
332            amount_usdc: shortfall,
333        };
334
335        match classification {
336            PmLiquidityClassification::Paid | PmLiquidityClassification::Unavailable => {}
337            PmLiquidityClassification::TimingBridge | PmLiquidityClassification::SettlementDebt => {
338                if shortfall <= Decimal::ZERO {
339                    return Err(
340                        "PM settlement liability classification requires positive shortfall"
341                            .to_string(),
342                    );
343                }
344
345                let pool = self
346                    .pools
347                    .get_mut(&command.underlying)
348                    .ok_or_else(|| format!("unknown PM settlement pool {}", command.underlying))?;
349                if pool.pool_available_usdc < shortfall {
350                    return Err("PM settlement pool cannot front full shortfall".to_string());
351                }
352                let bridge_window_ms = pool
353                    .config
354                    .as_ref()
355                    .ok_or_else(|| format!("missing PM settlement config {}", command.underlying))?
356                    .bridge_window_ms;
357                pool.pool_available_usdc -= shortfall;
358                match classification {
359                    PmLiquidityClassification::TimingBridge => {
360                        pool.active_timing_bridge_usdc += shortfall;
361                    }
362                    PmLiquidityClassification::SettlementDebt => {
363                        pool.active_settlement_debt_usdc += shortfall;
364                    }
365                    PmLiquidityClassification::Paid | PmLiquidityClassification::Unavailable => {}
366                }
367                pool.updated_at_ms = timestamp_ms;
368                pool.recompute_utilization()?;
369
370                let key = PmSettlementAccountKey {
371                    wallet: command.wallet,
372                    underlying: command.underlying.clone(),
373                };
374                let account = self.accounts.entry(key).or_insert_with(|| {
375                    PmSettlementAccountState::clean(
376                        command.wallet,
377                        command.underlying.clone(),
378                        command.policy_version,
379                        timestamp_ms,
380                    )
381                });
382                if account.principal_liability_usdc() == Decimal::ZERO
383                    && account.accrued_interest_usdc == Decimal::ZERO
384                {
385                    account.last_interest_accrual_ms = timestamp_ms as i64;
386                }
387                match classification {
388                    PmLiquidityClassification::TimingBridge => {
389                        account.bridge_principal_usdc += shortfall;
390                        account.bridge_deadline_ms = Some(timestamp_ms as i64 + bridge_window_ms);
391                    }
392                    PmLiquidityClassification::SettlementDebt => {
393                        account.debt_principal_usdc += shortfall;
394                    }
395                    PmLiquidityClassification::Paid | PmLiquidityClassification::Unavailable => {}
396                }
397                account.policy_version = command.policy_version;
398                account.status = account.status_after_repayment();
399                account.updated_at_ms = timestamp_ms;
400
401                effects.push(PmSettlementProjectionEffect::PoolUpsert(pool.clone()));
402                effects.push(PmSettlementProjectionEffect::AccountUpsert(account.clone()));
403
404                let should_create_debt_plan =
405                    if classification == PmLiquidityClassification::SettlementDebt {
406                        account
407                            .active_recovery_plan_id
408                            .as_deref()
409                            .map(|plan_id| {
410                                self.recovery_plans
411                                    .get(plan_id)
412                                    .map(|plan| !recovery_plan_blocks_new_plan(plan))
413                                    .unwrap_or(true)
414                            })
415                            .unwrap_or(true)
416                    } else {
417                        false
418                    };
419                if should_create_debt_plan {
420                    let plan = PmRecoveryPlanState {
421                        plan_id: format!("pm-recovery-{}", command.request_id),
422                        wallet: command.wallet,
423                        underlying: command.underlying.clone(),
424                        trigger: "settlement_debt_opened".to_string(),
425                        reason: "PM settlement classified as debt".to_string(),
426                        policy_version: command.policy_version,
427                        recovery_priority_version: 1,
428                        status: "EscalatedManual".to_string(),
429                        input_digest: command.facts_digest.clone(),
430                        target_reduction_usdc: shortfall,
431                        expected_usdc_recovered: Decimal::ZERO,
432                        expected_obligation_reduced: Decimal::ZERO,
433                        expected_impact_usdc: Decimal::ZERO,
434                        post_plan_utilization: pool.utilization,
435                        actions: vec![PmRecoveryActionState {
436                            action_index: 0,
437                            action_type: "EscalateManual".to_string(),
438                            target: "operator".to_string(),
439                            action_payload: recovery_action_payload(
440                                &PmRecoveryActionKind::EscalateManual {
441                                    reason: PmRecoveryReason::SettlementDebt,
442                                },
443                            ),
444                            status: "EscalatedManual".to_string(),
445                            expected_usdc_recovered: Decimal::ZERO,
446                            expected_obligation_reduced: Decimal::ZERO,
447                            expected_impact_usdc: Decimal::ZERO,
448                            attempt: 0,
449                            submitted_external_id: None,
450                            external_kind: None,
451                            result: None,
452                            result_external_id: None,
453                            recovered_usdc: Decimal::ZERO,
454                            liability_reduction_usdc: Decimal::ZERO,
455                            updated_at_ms: timestamp_ms,
456                        }],
457                        updated_at_ms: timestamp_ms,
458                    };
459                    account.active_recovery_plan_id = Some(plan.plan_id.clone());
460                    account.updated_at_ms = timestamp_ms;
461                    self.recovery_plans
462                        .insert(plan.plan_id.clone(), plan.clone());
463                    effects.push(PmSettlementProjectionEffect::AccountUpsert(account.clone()));
464                    effects.push(PmSettlementProjectionEffect::RecoveryPlanUpsert(plan));
465                }
466            }
467        }
468
469        if classification != PmLiquidityClassification::Unavailable {
470            self.events.insert(command.event_key.clone(), event.clone());
471        }
472        effects.push(PmSettlementProjectionEffect::EventUpsert(event));
473        if classification != PmLiquidityClassification::Unavailable {
474            self.record_request(
475                command.request_id,
476                command.facts_digest,
477                "ClassifyPmSettlement",
478                timestamp_ms,
479            );
480        }
481        Ok(effects)
482    }
483
484    pub fn apply_set_config(
485        &mut self,
486        command: SetPmSettlementPoolConfigCommand,
487    ) -> Result<Vec<PmSettlementProjectionEffect>, String> {
488        validate_request_identity(
489            command.request_id,
490            &command.input_digest,
491            "SetPmSettlementPoolConfig",
492        )?;
493        reject_projection_version("config_version", command.config_version)?;
494        reject_projection_version("policy_version", command.config.policy_version)?;
495        validate_pool_config(&command.config).map_err(|error| error.to_string())?;
496        if let Some(existing) = self.idempotent_duplicate(
497            command.request_id,
498            &command.input_digest,
499            "SetPmSettlementPoolConfig",
500        )? {
501            return Ok(existing);
502        }
503
504        let pool = self
505            .pools
506            .entry(command.underlying.clone())
507            .or_insert_with(|| PmSettlementPoolState::empty(command.underlying.clone()));
508        if command.config_version <= pool.config_version {
509            return Err(format!(
510                "config_version must increase for {}: current {}, requested {}",
511                command.underlying, pool.config_version, command.config_version
512            ));
513        }
514
515        pool.config_version = command.config_version;
516        pool.policy_version = command.config.policy_version;
517        pool.config = Some(command.config);
518        pool.updated_at_ms = command.timestamp_ms;
519        pool.recompute_utilization()?;
520
521        let effects = vec![PmSettlementProjectionEffect::PoolUpsert(pool.clone())];
522        self.record_request(
523            command.request_id,
524            command.input_digest,
525            "SetPmSettlementPoolConfig",
526            command.timestamp_ms,
527        );
528        Ok(effects)
529    }
530
531    pub fn apply_record_vault_deposit(
532        &mut self,
533        command: RecordPmVaultDepositCommand,
534    ) -> Result<Vec<PmSettlementProjectionEffect>, String> {
535        validate_request_identity(
536            command.request_id,
537            &command.input_digest,
538            "RecordPmVaultDeposit",
539        )?;
540        reject_nonpositive("amount_usdc", command.amount_usdc)?;
541        if command.underlying.trim().is_empty() {
542            return Err("RecordPmVaultDeposit underlying must be nonempty".to_string());
543        }
544        let tx_hash = command.tx_hash.trim().to_ascii_lowercase();
545        if tx_hash.is_empty() {
546            return Err("RecordPmVaultDeposit tx_hash must be nonempty".to_string());
547        }
548        if command.max_listed_expiry_ts_ms <= 0 {
549            return Err(
550                "RecordPmVaultDeposit requires a positive max listed expiry timestamp".to_string(),
551            );
552        }
553        if command.settlement_grace_ms < 0 {
554            return Err("RecordPmVaultDeposit settlement_grace_ms must be nonnegative".to_string());
555        }
556        let lock_until_ms = command
557            .max_listed_expiry_ts_ms
558            .checked_add(command.settlement_grace_ms)
559            .ok_or_else(|| "RecordPmVaultDeposit lock_until_ms overflow".to_string())?;
560        if let Some(existing) = self.idempotent_duplicate(
561            command.request_id,
562            &command.input_digest,
563            "RecordPmVaultDeposit",
564        )? {
565            return Ok(existing);
566        }
567        if self.vault_deposits.contains_key(&command.request_id) {
568            return Err(format!(
569                "PM vault deposit {} already exists with different digest",
570                command.request_id
571            ));
572        }
573        if self.vault_deposits.values().any(|deposit| {
574            deposit.chain_id == command.chain_id
575                && deposit.source_contract_address == command.source_contract_address
576                && deposit.tx_hash == tx_hash
577                && deposit.log_index == command.log_index
578        }) {
579            return Err(format!(
580                "PM vault deposit source event already recorded: chain {} vault {} tx {} log {}",
581                command.chain_id, command.source_contract_address, tx_hash, command.log_index
582            ));
583        }
584
585        let pool = self
586            .pools
587            .get_mut(&command.underlying)
588            .ok_or_else(|| format!("unknown PM settlement pool {}", command.underlying))?;
589        pool.pool_available_usdc += command.amount_usdc;
590        pool.updated_at_ms = command.timestamp_ms;
591        pool.recompute_utilization()?;
592
593        let deposit = PmVaultDepositState {
594            deposit_id: command.request_id,
595            depositor: command.depositor,
596            underlying: command.underlying,
597            principal_usdc: command.amount_usdc,
598            remaining_usdc: command.amount_usdc,
599            withdrawn_usdc: Decimal::ZERO,
600            reserved_withdrawal_usdc: Decimal::ZERO,
601            chain_id: command.chain_id,
602            source_contract_address: command.source_contract_address,
603            tx_hash,
604            log_index: command.log_index,
605            max_listed_expiry_ts_ms: command.max_listed_expiry_ts_ms,
606            settlement_grace_ms: command.settlement_grace_ms,
607            lock_until_ms,
608            status: PmVaultDepositStatus::Active,
609            created_at_ms: command.timestamp_ms,
610            updated_at_ms: command.timestamp_ms,
611        };
612        self.vault_deposits
613            .insert(deposit.deposit_id, deposit.clone());
614        let effects = vec![
615            PmSettlementProjectionEffect::PoolUpsert(pool.clone()),
616            PmSettlementProjectionEffect::VaultDepositUpsert(deposit),
617        ];
618        self.record_request(
619            command.request_id,
620            command.input_digest,
621            "RecordPmVaultDeposit",
622            command.timestamp_ms,
623        );
624        Ok(effects)
625    }
626
627    pub fn apply_request_vault_withdrawal(
628        &mut self,
629        command: RequestPmVaultWithdrawalCommand,
630    ) -> Result<Vec<PmSettlementProjectionEffect>, String> {
631        validate_request_identity(
632            command.request_id,
633            &command.input_digest,
634            "RequestPmVaultWithdrawal",
635        )?;
636        reject_nonpositive("amount_usdc", command.amount_usdc)?;
637        if command.underlying.trim().is_empty() {
638            return Err("RequestPmVaultWithdrawal underlying must be nonempty".to_string());
639        }
640        if let Some(existing) = self.idempotent_duplicate(
641            command.request_id,
642            &command.input_digest,
643            "RequestPmVaultWithdrawal",
644        )? {
645            return Ok(existing);
646        }
647        if self.vault_withdrawals.contains_key(&command.request_id) {
648            return Err(format!(
649                "PM vault withdrawal {} already exists with different digest",
650                command.request_id
651            ));
652        }
653
654        let deposit = self
655            .vault_deposits
656            .get(&command.deposit_id)
657            .ok_or_else(|| format!("unknown PM vault deposit {}", command.deposit_id))?;
658        if deposit.depositor != command.depositor {
659            return Err("PM vault withdrawal depositor does not own deposit".to_string());
660        }
661        if deposit.underlying != command.underlying {
662            return Err("PM vault withdrawal underlying does not match deposit".to_string());
663        }
664        let request_ts_ms = i64::try_from(command.timestamp_ms)
665            .map_err(|_| "PM vault withdrawal timestamp exceeds i64".to_string())?;
666        if request_ts_ms < deposit.lock_until_ms {
667            return Err(format!(
668                "PM vault deposit {} is locked until {}",
669                command.deposit_id, deposit.lock_until_ms
670            ));
671        }
672        if deposit.remaining_usdc < command.amount_usdc {
673            return Err("PM vault withdrawal exceeds deposit remaining amount".to_string());
674        }
675        let pool = self
676            .pools
677            .get(&command.underlying)
678            .ok_or_else(|| format!("unknown PM settlement pool {}", command.underlying))?;
679        if pool.pool_available_usdc < command.amount_usdc {
680            return Err("PM vault withdrawal would exceed usable pool availability".to_string());
681        }
682
683        let deposit = self
684            .vault_deposits
685            .get_mut(&command.deposit_id)
686            .expect("deposit was validated before withdrawal mutation");
687        deposit.remaining_usdc -= command.amount_usdc;
688        deposit.reserved_withdrawal_usdc += command.amount_usdc;
689        deposit.status = if deposit.remaining_usdc == Decimal::ZERO {
690            PmVaultDepositStatus::Reserved
691        } else {
692            PmVaultDepositStatus::PartiallyReserved
693        };
694        deposit.updated_at_ms = command.timestamp_ms;
695
696        let pool = self
697            .pools
698            .get_mut(&command.underlying)
699            .expect("pool was validated before withdrawal mutation");
700        pool.pool_available_usdc -= command.amount_usdc;
701        pool.updated_at_ms = command.timestamp_ms;
702        pool.recompute_utilization()?;
703
704        let withdrawal = PmVaultWithdrawalState {
705            withdrawal_id: command.request_id,
706            deposit_id: command.deposit_id,
707            depositor: command.depositor,
708            underlying: command.underlying,
709            amount_usdc: command.amount_usdc,
710            lock_until_ms: deposit.lock_until_ms,
711            status: PmVaultWithdrawalStatus::Reserved,
712            requested_at_ms: command.timestamp_ms,
713            updated_at_ms: command.timestamp_ms,
714        };
715        self.vault_withdrawals
716            .insert(withdrawal.withdrawal_id, withdrawal.clone());
717        let effects = vec![
718            PmSettlementProjectionEffect::PoolUpsert(pool.clone()),
719            PmSettlementProjectionEffect::VaultDepositUpsert(deposit.clone()),
720            PmSettlementProjectionEffect::VaultWithdrawalUpsert(withdrawal),
721        ];
722        self.record_request(
723            command.request_id,
724            command.input_digest,
725            "RequestPmVaultWithdrawal",
726            command.timestamp_ms,
727        );
728        Ok(effects)
729    }
730
731    #[cfg(test)]
732    pub fn set_test_pool_available_usdc(
733        &mut self,
734        underlying: String,
735        amount_usdc: Decimal,
736        timestamp_ms: u64,
737    ) -> Result<(), String> {
738        reject_negative_amount("amount_usdc", amount_usdc)?;
739        let pool = self
740            .pools
741            .get_mut(&underlying)
742            .ok_or_else(|| format!("unknown PM settlement pool {underlying}"))?;
743        pool.pool_available_usdc = amount_usdc;
744        pool.updated_at_ms = timestamp_ms;
745        pool.recompute_utilization()
746    }
747
748    pub fn apply_accrue_interest(
749        &mut self,
750        command: AccruePmSettlementInterestCommand,
751    ) -> Result<Vec<PmSettlementProjectionEffect>, String> {
752        validate_request_identity(
753            command.request_id,
754            &command.input_digest,
755            "AccruePmSettlementInterest",
756        )?;
757        if let Some(existing) = self.idempotent_duplicate(
758            command.request_id,
759            &command.input_digest,
760            "AccruePmSettlementInterest",
761        )? {
762            return Ok(existing);
763        }
764
765        let key = PmSettlementAccountKey {
766            wallet: command.wallet,
767            underlying: command.underlying.clone(),
768        };
769        let account = self.accounts.get_mut(&key).ok_or_else(|| {
770            format!(
771                "unknown PM settlement account {}/{}",
772                key.wallet, key.underlying
773            )
774        })?;
775        let from_ms = account.last_interest_accrual_ms;
776        if command.to_ms < from_ms {
777            return Err(format!(
778                "accrual end {} is before account cursor {}",
779                command.to_ms, account.last_interest_accrual_ms
780            ));
781        }
782        let timestamp_ms = i64::try_from(command.timestamp_ms)
783            .map_err(|_| "command timestamp must fit in signed 64-bit milliseconds".to_string())?;
784        if command.to_ms > timestamp_ms {
785            return Err(format!(
786                "accrual end {} is after command timestamp {}",
787                command.to_ms, timestamp_ms
788            ));
789        }
790
791        let pool = self
792            .pools
793            .get(&command.underlying)
794            .ok_or_else(|| format!("unknown PM settlement pool {}", command.underlying))?;
795        let config = pool
796            .config
797            .as_ref()
798            .ok_or_else(|| format!("missing PM settlement config {}", command.underlying))?;
799        let utilization = pool.utilization.ok_or_else(|| {
800            format!(
801                "missing PM settlement pool utilization for {} - cannot accrue interest",
802                command.underlying
803            )
804        })?;
805        let apr = utilization_apr(utilization, config).map_err(|error| error.to_string())?;
806        let policy_version = config.policy_version;
807        reject_projection_version("policy_version", policy_version)?;
808
809        let principal = account.principal_liability_usdc();
810        let elapsed_ms = u64::try_from(command.to_ms - from_ms)
811            .map_err(|_| "accrual window must be nonnegative".to_string())?;
812        let interest_usdc = if principal == Decimal::ZERO || elapsed_ms == 0 {
813            Decimal::ZERO
814        } else {
815            principal * apr * Decimal::from(elapsed_ms) / Decimal::from(INTEREST_YEAR_MS)
816        };
817
818        account.accrued_interest_usdc += interest_usdc;
819        account.last_interest_accrual_ms = command.to_ms;
820        account.policy_version = policy_version;
821        account.updated_at_ms = command.timestamp_ms;
822
823        let interest = PmSettlementInterestEvent {
824            request_id: command.request_id,
825            wallet: command.wallet,
826            underlying: command.underlying,
827            accrual_start_ms: from_ms,
828            accrual_end_ms: command.to_ms,
829            utilization,
830            apr,
831            interest_amount_usdc: interest_usdc,
832            policy_version,
833        };
834        let effects = vec![
835            PmSettlementProjectionEffect::InterestEvent(interest),
836            PmSettlementProjectionEffect::AccountUpsert(account.clone()),
837        ];
838        self.record_request(
839            command.request_id,
840            command.input_digest,
841            "AccruePmSettlementInterest",
842            command.timestamp_ms,
843        );
844        Ok(effects)
845    }
846
847    pub fn apply_repayment(
848        &mut self,
849        command: ApplyPmSettlementRepaymentCommand,
850    ) -> Result<Vec<PmSettlementProjectionEffect>, String> {
851        validate_request_identity(
852            command.request_id,
853            &command.input_digest,
854            "ApplyPmSettlementRepayment",
855        )?;
856        reject_nonpositive("amount_usdc", command.amount_usdc)?;
857        if command.reason.trim().is_empty() {
858            return Err("repayment reason must be nonempty".to_string());
859        }
860        if let Some(existing) = self.idempotent_duplicate(
861            command.request_id,
862            &command.input_digest,
863            "ApplyPmSettlementRepayment",
864        )? {
865            return Ok(existing);
866        }
867
868        let key = PmSettlementAccountKey {
869            wallet: command.wallet,
870            underlying: command.underlying.clone(),
871        };
872        let account = self.accounts.get(&key).ok_or_else(|| {
873            format!(
874                "unknown PM settlement account {}/{}",
875                key.wallet, key.underlying
876            )
877        })?;
878        let pool = self
879            .pools
880            .get(&command.underlying)
881            .ok_or_else(|| format!("unknown PM settlement pool {}", command.underlying))?;
882
883        let mut remaining = command.amount_usdc;
884        let interest_paid = remaining.min(account.accrued_interest_usdc);
885        remaining -= interest_paid;
886
887        let debt_paid = remaining.min(account.debt_principal_usdc);
888        remaining -= debt_paid;
889
890        let bridge_paid = remaining.min(account.bridge_principal_usdc);
891        remaining -= bridge_paid;
892        if remaining > Decimal::ZERO {
893            return Err("repayment exceeds outstanding PM settlement liability".to_string());
894        }
895        if pool.active_timing_bridge_usdc < bridge_paid {
896            return Err("pool active timing bridge would become negative".to_string());
897        }
898        if pool.active_settlement_debt_usdc < debt_paid {
899            return Err("pool active settlement debt would become negative".to_string());
900        }
901        let active_plan_to_repay = match account.active_recovery_plan_id.as_deref() {
902            Some(plan_id) => {
903                let plan = self.recovery_plans.get(plan_id).ok_or_else(|| {
904                    format!("active PM recovery plan {plan_id} missing during repayment resolution")
905                })?;
906                recovery_plan_repaid_by_repayment(
907                    plan,
908                    account,
909                    interest_paid,
910                    debt_paid,
911                    bridge_paid,
912                )
913                .then(|| plan_id.to_string())
914            }
915            None => None,
916        };
917
918        let principal_paid = bridge_paid + debt_paid;
919        let account = self
920            .accounts
921            .get_mut(&key)
922            .expect("account was validated before repayment mutation");
923        account.accrued_interest_usdc -= interest_paid;
924        account.bridge_principal_usdc -= bridge_paid;
925        account.debt_principal_usdc -= debt_paid;
926        if active_plan_to_repay.is_some() {
927            account.active_recovery_plan_id = None;
928        }
929        let pool = self
930            .pools
931            .get_mut(&command.underlying)
932            .expect("pool was validated before repayment mutation");
933        pool.pool_available_usdc += principal_paid;
934        pool.active_timing_bridge_usdc -= bridge_paid;
935        pool.active_settlement_debt_usdc -= debt_paid;
936        pool.updated_at_ms = command.timestamp_ms;
937        pool.recompute_utilization()?;
938
939        account.status = account.status_after_repayment();
940        account.updated_at_ms = command.timestamp_ms;
941        let account_projection = account.clone();
942        let pool_projection = pool.clone();
943        let recovery_plan_effect = active_plan_to_repay.map(|plan_id| {
944            let plan = self
945                .recovery_plans
946                .get_mut(&plan_id)
947                .expect("active recovery plan was validated before repayment mutation");
948            plan.status = "Repaid".to_string();
949            plan.updated_at_ms = command.timestamp_ms;
950            PmSettlementProjectionEffect::RecoveryPlanUpsert(plan.clone())
951        });
952
953        let repayment = PmSettlementRepaymentEvent {
954            request_id: command.request_id,
955            wallet: command.wallet,
956            underlying: command.underlying,
957            source_event_id: command.source_event_id,
958            amount_usdc: command.amount_usdc,
959            interest_paid_usdc: interest_paid,
960            principal_paid_usdc: principal_paid,
961            pool_replenished_usdc: principal_paid,
962            reason: command.reason,
963            event_ts_ms: command.timestamp_ms,
964        };
965        let mut effects = vec![
966            PmSettlementProjectionEffect::RepaymentEvent(repayment),
967            PmSettlementProjectionEffect::AccountUpsert(account_projection),
968            PmSettlementProjectionEffect::PoolUpsert(pool_projection),
969        ];
970        if let Some(effect) = recovery_plan_effect {
971            effects.push(effect);
972        }
973        self.record_request(
974            command.request_id,
975            command.input_digest,
976            "ApplyPmSettlementRepayment",
977            command.timestamp_ms,
978        );
979        Ok(effects)
980    }
981
982    pub fn apply_journal_recovery_plan(
983        &mut self,
984        command: JournalPmRecoveryPlanCommand,
985    ) -> Result<Vec<PmSettlementProjectionEffect>, String> {
986        validate_request_identity(
987            command.request_id,
988            &command.input_digest,
989            "JournalPmRecoveryPlan",
990        )?;
991        reject_projection_version("policy_version", command.plan.policy_version)?;
992        reject_projection_version(
993            "recovery_priority_version",
994            command.plan.recovery_priority_version,
995        )?;
996        if command.plan.plan_id.trim().is_empty() {
997            return Err("JournalPmRecoveryPlan plan_id must be nonempty".to_string());
998        }
999        if command.plan.underlying.trim().is_empty() {
1000            return Err("JournalPmRecoveryPlan underlying must be nonempty".to_string());
1001        }
1002        if command.plan.actions.is_empty() {
1003            return Err("JournalPmRecoveryPlan actions must be nonempty".to_string());
1004        }
1005        if let Some(existing) = self.idempotent_duplicate(
1006            command.request_id,
1007            &command.input_digest,
1008            "JournalPmRecoveryPlan",
1009        )? {
1010            return Ok(existing);
1011        }
1012        if let Some(existing) = self.recovery_plans.get(&command.plan.plan_id) {
1013            if existing.input_digest != command.input_digest {
1014                return Err(format!(
1015                    "PM recovery plan {} digest mismatch",
1016                    command.plan.plan_id
1017                ));
1018            }
1019            if !recovery_plan_payload_matches(existing, &command.plan, &command.input_digest) {
1020                return Err(format!(
1021                    "PM recovery plan {} payload mismatch",
1022                    command.plan.plan_id
1023                ));
1024            }
1025            self.record_request(
1026                command.request_id,
1027                command.input_digest,
1028                "JournalPmRecoveryPlan",
1029                command.timestamp_ms,
1030            );
1031            return Ok(Vec::new());
1032        }
1033        if let Some(active_plan_id) = self
1034            .accounts
1035            .get(&PmSettlementAccountKey {
1036                wallet: command.plan.wallet,
1037                underlying: command.plan.underlying.clone(),
1038            })
1039            .and_then(|account| account.active_recovery_plan_id.as_deref())
1040        {
1041            if active_plan_id != command.plan.plan_id {
1042                let active_plan = self.recovery_plans.get(active_plan_id).ok_or_else(|| {
1043                    format!("active PM recovery plan {active_plan_id} is missing")
1044                })?;
1045                if recovery_plan_blocks_new_plan(active_plan) {
1046                    return Err(format!(
1047                        "PM recovery account already has active plan {active_plan_id}"
1048                    ));
1049                }
1050            }
1051        }
1052        if let Some(blocking_plan) = self.recovery_plans.values().find(|plan| {
1053            plan.wallet == command.plan.wallet
1054                && plan.underlying == command.plan.underlying
1055                && plan.plan_id != command.plan.plan_id
1056                && recovery_plan_blocks_new_plan(plan)
1057        }) {
1058            return Err(format!(
1059                "PM recovery account already has active plan {}",
1060                blocking_plan.plan_id
1061            ));
1062        }
1063
1064        let plan_expected_usdc_recovered = command.plan.expected_usdc_recovered;
1065        let plan_expected_obligation_reduced = command.plan.expected_obligation_reduced;
1066        let plan_expected_impact_usdc = command.plan.expected_impact_usdc;
1067        let mut expected_index = 0u32;
1068        let mut actions = Vec::with_capacity(command.plan.actions.len());
1069        for action in command.plan.actions {
1070            if action.action_index != expected_index {
1071                return Err(format!(
1072                    "PM recovery action index must be contiguous: expected {}, got {}",
1073                    expected_index, action.action_index
1074                ));
1075            }
1076            validate_recovery_action_payload(&action.action)?;
1077            reject_negative_amount("expected_usdc_recovered", action.expected_usdc_recovered)?;
1078            reject_negative_amount(
1079                "expected_obligation_reduced",
1080                action.expected_obligation_reduced,
1081            )?;
1082            reject_negative_amount("expected_impact_usdc", action.expected_impact_usdc)?;
1083            actions.push(PmRecoveryActionState {
1084                action_index: action.action_index,
1085                action_type: recovery_action_type(&action.action).to_string(),
1086                target: recovery_action_target(&action.action),
1087                action_payload: recovery_action_payload(&action.action),
1088                status: recovery_initial_action_status(&action.action).to_string(),
1089                expected_usdc_recovered: action.expected_usdc_recovered,
1090                expected_obligation_reduced: action.expected_obligation_reduced,
1091                expected_impact_usdc: action.expected_impact_usdc,
1092                attempt: 0,
1093                submitted_external_id: None,
1094                external_kind: None,
1095                result: None,
1096                result_external_id: None,
1097                recovered_usdc: Decimal::ZERO,
1098                liability_reduction_usdc: Decimal::ZERO,
1099                updated_at_ms: command.timestamp_ms,
1100            });
1101            expected_index += 1;
1102        }
1103        reject_negative_amount("target_reduction_usdc", command.plan.target_reduction_usdc)?;
1104        reject_negative_amount(
1105            "expected_usdc_recovered",
1106            command.plan.expected_usdc_recovered,
1107        )?;
1108        reject_negative_amount(
1109            "expected_obligation_reduced",
1110            plan_expected_obligation_reduced,
1111        )?;
1112        reject_negative_amount("expected_impact_usdc", plan_expected_impact_usdc)?;
1113        validate_recovery_plan_totals(
1114            plan_expected_usdc_recovered,
1115            plan_expected_obligation_reduced,
1116            plan_expected_impact_usdc,
1117            &actions,
1118        )?;
1119
1120        let plan_status = if actions
1121            .iter()
1122            .all(|action| action.status == "EscalatedManual")
1123        {
1124            "EscalatedManual"
1125        } else {
1126            "Planned"
1127        };
1128        let plan = PmRecoveryPlanState {
1129            plan_id: command.plan.plan_id,
1130            wallet: command.plan.wallet,
1131            underlying: command.plan.underlying,
1132            trigger: format!("{:?}", command.plan.trigger),
1133            reason: format!("{:?}", command.plan.reason),
1134            policy_version: command.plan.policy_version,
1135            recovery_priority_version: command.plan.recovery_priority_version,
1136            status: plan_status.to_string(),
1137            input_digest: command.input_digest.clone(),
1138            target_reduction_usdc: command.plan.target_reduction_usdc,
1139            expected_usdc_recovered: command.plan.expected_usdc_recovered,
1140            expected_obligation_reduced: command.plan.expected_obligation_reduced,
1141            expected_impact_usdc: command.plan.expected_impact_usdc,
1142            post_plan_utilization: command.plan.post_plan_utilization,
1143            actions,
1144            updated_at_ms: command.timestamp_ms,
1145        };
1146        let account_effect = if let Some(account) = self.accounts.get_mut(&PmSettlementAccountKey {
1147            wallet: plan.wallet,
1148            underlying: plan.underlying.clone(),
1149        }) {
1150            account.active_recovery_plan_id = Some(plan.plan_id.clone());
1151            account.updated_at_ms = command.timestamp_ms;
1152            Some(PmSettlementProjectionEffect::AccountUpsert(account.clone()))
1153        } else {
1154            None
1155        };
1156        self.recovery_plans
1157            .insert(plan.plan_id.clone(), plan.clone());
1158        self.record_request(
1159            command.request_id,
1160            command.input_digest,
1161            "JournalPmRecoveryPlan",
1162            command.timestamp_ms,
1163        );
1164        let mut effects = vec![PmSettlementProjectionEffect::RecoveryPlanUpsert(plan)];
1165        if let Some(effect) = account_effect {
1166            effects.push(effect);
1167        }
1168        Ok(effects)
1169    }
1170
1171    pub fn apply_mark_recovery_action_submitted(
1172        &mut self,
1173        command: MarkPmRecoveryActionSubmittedCommand,
1174    ) -> Result<Vec<PmSettlementProjectionEffect>, String> {
1175        validate_request_identity(
1176            command.request_id,
1177            &command.input_digest,
1178            "MarkPmRecoveryActionSubmitted",
1179        )?;
1180        if command.external_id.trim().is_empty() {
1181            return Err("submitted recovery external_id must be nonempty".to_string());
1182        }
1183        reject_projection_version("attempt", command.attempt)?;
1184        if let Some(existing) = self.idempotent_duplicate(
1185            command.request_id,
1186            &command.input_digest,
1187            "MarkPmRecoveryActionSubmitted",
1188        )? {
1189            return Ok(existing);
1190        }
1191        let external_kind = format!("{:?}", command.external_kind);
1192        let plan = self
1193            .recovery_plans
1194            .get_mut(&command.plan_id)
1195            .ok_or_else(|| format!("unknown PM recovery plan {}", command.plan_id))?;
1196        if plan.wallet != command.wallet {
1197            return Err(format!(
1198                "PM recovery action wallet mismatch for plan {}",
1199                command.plan_id
1200            ));
1201        }
1202        if recovery_plan_status_is_terminal(&plan.status) {
1203            return Err(format!(
1204                "cannot submit PM recovery action for terminal plan {} status {}",
1205                command.plan_id, plan.status
1206            ));
1207        }
1208        let action = plan
1209            .actions
1210            .iter_mut()
1211            .find(|action| action.action_index == command.action_index)
1212            .ok_or_else(|| {
1213                format!(
1214                    "unknown PM recovery action {}:{}",
1215                    command.plan_id, command.action_index
1216                )
1217            })?;
1218        if action.status == "Submitted"
1219            && action.attempt == command.attempt
1220            && action.submitted_external_id.as_deref() == Some(command.external_id.as_str())
1221        {
1222            if action.external_kind.as_deref() != Some(external_kind.as_str()) {
1223                return Err(format!(
1224                    "conflicting PM recovery action submission for {}:{} attempt {}",
1225                    command.plan_id, command.action_index, command.attempt
1226                ));
1227            }
1228            self.record_request(
1229                command.request_id,
1230                command.input_digest,
1231                "MarkPmRecoveryActionSubmitted",
1232                command.timestamp_ms,
1233            );
1234            return Ok(Vec::new());
1235        }
1236        if !matches!(action.status.as_str(), "Planned" | "Failed" | "TimedOut") {
1237            return Err(format!(
1238                "cannot submit PM recovery action {}:{} from status {}",
1239                command.plan_id, command.action_index, action.status
1240            ));
1241        }
1242        if command.attempt <= action.attempt {
1243            return Err(format!(
1244                "PM recovery action attempt must increase: current {}, requested {}",
1245                action.attempt, command.attempt
1246            ));
1247        }
1248        action.status = "Submitted".to_string();
1249        action.attempt = command.attempt;
1250        action.submitted_external_id = Some(command.external_id);
1251        action.external_kind = Some(external_kind);
1252        action.result = None;
1253        action.result_external_id = None;
1254        action.recovered_usdc = Decimal::ZERO;
1255        action.liability_reduction_usdc = Decimal::ZERO;
1256        action.updated_at_ms = command.timestamp_ms;
1257        plan.status = "Submitted".to_string();
1258        plan.updated_at_ms = command.timestamp_ms;
1259        let effect = PmSettlementProjectionEffect::RecoveryPlanUpsert(plan.clone());
1260        self.record_request(
1261            command.request_id,
1262            command.input_digest,
1263            "MarkPmRecoveryActionSubmitted",
1264            command.timestamp_ms,
1265        );
1266        Ok(vec![effect])
1267    }
1268
1269    pub fn apply_resolve_recovery_action(
1270        &mut self,
1271        command: ResolvePmRecoveryActionCommand,
1272    ) -> Result<Vec<PmSettlementProjectionEffect>, String> {
1273        validate_request_identity(
1274            command.request_id,
1275            &command.input_digest,
1276            "ResolvePmRecoveryAction",
1277        )?;
1278        reject_negative_amount("recovered_usdc", command.recovered_usdc)?;
1279        reject_negative_amount("liability_reduction_usdc", command.liability_reduction_usdc)?;
1280        if command.result != PmRecoveryActionResult::Confirmed
1281            && (command.recovered_usdc > Decimal::ZERO
1282                || command.liability_reduction_usdc > Decimal::ZERO)
1283        {
1284            return Err(format!(
1285                "{:?} PM recovery action resolution cannot carry recovered or liability reduction amounts",
1286                command.result
1287            ));
1288        }
1289        if let Some(existing) = self.idempotent_duplicate(
1290            command.request_id,
1291            &command.input_digest,
1292            "ResolvePmRecoveryAction",
1293        )? {
1294            return Ok(existing);
1295        }
1296        let plan = self
1297            .recovery_plans
1298            .get_mut(&command.plan_id)
1299            .ok_or_else(|| format!("unknown PM recovery plan {}", command.plan_id))?;
1300        if plan.wallet != command.wallet {
1301            return Err(format!(
1302                "PM recovery action wallet mismatch for plan {}",
1303                command.plan_id
1304            ));
1305        }
1306        let action = plan
1307            .actions
1308            .iter_mut()
1309            .find(|action| action.action_index == command.action_index)
1310            .ok_or_else(|| {
1311                format!(
1312                    "unknown PM recovery action {}:{}",
1313                    command.plan_id, command.action_index
1314                )
1315            })?;
1316        if action.result.is_some() && action.attempt == command.attempt {
1317            let stored_status = recovery_result_status(command.result);
1318            if action.result.as_deref() == Some(stored_status)
1319                && action.recovered_usdc == command.recovered_usdc
1320                && action.liability_reduction_usdc == command.liability_reduction_usdc
1321                && action.result_external_id == command.result_external_id
1322            {
1323                self.record_request(
1324                    command.request_id,
1325                    command.input_digest,
1326                    "ResolvePmRecoveryAction",
1327                    command.timestamp_ms,
1328                );
1329                return Ok(Vec::new());
1330            }
1331            return Err(format!(
1332                "conflicting PM recovery action resolution for {}:{} attempt {}",
1333                command.plan_id, command.action_index, command.attempt
1334            ));
1335        }
1336        if recovery_plan_status_is_terminal(&plan.status) {
1337            return Err(format!(
1338                "cannot resolve PM recovery action for terminal plan {} status {}",
1339                command.plan_id, plan.status
1340            ));
1341        }
1342        if action.status != "Submitted" {
1343            return Err(format!(
1344                "cannot resolve PM recovery action {}:{} from status {}",
1345                command.plan_id, command.action_index, action.status
1346            ));
1347        }
1348        if action.attempt != command.attempt {
1349            return Err(format!(
1350                "PM recovery action attempt mismatch: current {}, requested {}",
1351                action.attempt, command.attempt
1352            ));
1353        }
1354        let status = recovery_result_status(command.result).to_string();
1355        action.status = status.clone();
1356        action.result = Some(status);
1357        action.recovered_usdc = command.recovered_usdc;
1358        action.liability_reduction_usdc = command.liability_reduction_usdc;
1359        action.result_external_id = command.result_external_id;
1360        action.updated_at_ms = command.timestamp_ms;
1361        plan.status = recovery_plan_status_after_action_resolution(&plan.actions);
1362        plan.updated_at_ms = command.timestamp_ms;
1363        let effect = PmSettlementProjectionEffect::RecoveryPlanUpsert(plan.clone());
1364        let account_effect = if recovery_plan_releases_active_link(plan) {
1365            self.accounts
1366                .get_mut(&PmSettlementAccountKey {
1367                    wallet: plan.wallet,
1368                    underlying: plan.underlying.clone(),
1369                })
1370                .and_then(|account| {
1371                    if account.active_recovery_plan_id.as_deref() == Some(plan.plan_id.as_str()) {
1372                        account.active_recovery_plan_id = None;
1373                        account.updated_at_ms = command.timestamp_ms;
1374                        Some(PmSettlementProjectionEffect::AccountUpsert(account.clone()))
1375                    } else {
1376                        None
1377                    }
1378                })
1379        } else {
1380            None
1381        };
1382        self.record_request(
1383            command.request_id,
1384            command.input_digest,
1385            "ResolvePmRecoveryAction",
1386            command.timestamp_ms,
1387        );
1388        let mut effects = vec![effect];
1389        if let Some(effect) = account_effect {
1390            effects.push(effect);
1391        }
1392        Ok(effects)
1393    }
1394
1395    fn idempotent_duplicate(
1396        &self,
1397        request_id: Uuid,
1398        input_digest: &str,
1399        command_type: &str,
1400    ) -> Result<Option<Vec<PmSettlementProjectionEffect>>, String> {
1401        let request_id = request_id.to_string();
1402        let Some(existing) = self.idempotency.get(&request_id) else {
1403            return Ok(None);
1404        };
1405        if existing.input_digest != input_digest || existing.command_type != command_type {
1406            return Err(format!(
1407                "PM settlement request idempotency mismatch for {}",
1408                request_id
1409            ));
1410        }
1411        Ok(Some(Vec::new()))
1412    }
1413
1414    fn record_request(
1415        &mut self,
1416        request_id: Uuid,
1417        input_digest: String,
1418        command_type: &'static str,
1419        applied_at_ms: u64,
1420    ) {
1421        self.idempotency.insert(
1422            request_id.to_string(),
1423            PmSettlementRequestRecord {
1424                request_id,
1425                input_digest,
1426                output_digest: "ok".to_string(),
1427                command_type: command_type.to_string(),
1428                applied_at_ms,
1429            },
1430        );
1431    }
1432}
1433
1434impl PmSettlementPoolState {
1435    pub fn empty(underlying: String) -> Self {
1436        Self {
1437            underlying,
1438            pool_available_usdc: Decimal::ZERO,
1439            pool_target_usdc: Decimal::ZERO,
1440            active_timing_bridge_usdc: Decimal::ZERO,
1441            active_settlement_debt_usdc: Decimal::ZERO,
1442            config_version: 0,
1443            policy_version: 0,
1444            config: None,
1445            utilization: None,
1446            updated_at_ms: 0,
1447        }
1448    }
1449
1450    fn recompute_utilization(&mut self) -> Result<(), String> {
1451        let snapshot = self.snapshot();
1452        let capacity = pool_capacity_usdc(&snapshot).map_err(|error| error.to_string())?;
1453        self.utilization = if capacity == Decimal::ZERO {
1454            None
1455        } else {
1456            pool_utilization(&snapshot).map_err(|error| error.to_string())?
1457        };
1458        Ok(())
1459    }
1460
1461    pub fn snapshot(&self) -> PmSettlementPoolSnapshot {
1462        PmSettlementPoolSnapshot {
1463            underlying: self.underlying.clone(),
1464            pool_available_usdc: self.pool_available_usdc,
1465            pool_target_usdc: self.pool_target_usdc,
1466            active_timing_bridge_usdc: self.active_timing_bridge_usdc,
1467            active_settlement_debt_usdc: self.active_settlement_debt_usdc,
1468            config_version: self.config_version,
1469            policy_version: self.policy_version,
1470        }
1471    }
1472}
1473
1474impl PmSettlementAccountState {
1475    fn clean(
1476        wallet: WalletAddress,
1477        underlying: String,
1478        policy_version: u32,
1479        updated_at_ms: u64,
1480    ) -> Self {
1481        Self {
1482            wallet,
1483            underlying,
1484            bridge_principal_usdc: Decimal::ZERO,
1485            debt_principal_usdc: Decimal::ZERO,
1486            accrued_interest_usdc: Decimal::ZERO,
1487            last_interest_accrual_ms: updated_at_ms as i64,
1488            policy_version,
1489            bridge_deadline_ms: None,
1490            status: PmSettlementAccountStatus::Clean,
1491            active_recovery_plan_id: None,
1492            updated_at_ms,
1493        }
1494    }
1495
1496    pub fn principal_liability_usdc(&self) -> Decimal {
1497        self.bridge_principal_usdc + self.debt_principal_usdc
1498    }
1499
1500    fn status_after_repayment(&self) -> PmSettlementAccountStatus {
1501        if self.principal_liability_usdc() == Decimal::ZERO
1502            && self.accrued_interest_usdc == Decimal::ZERO
1503        {
1504            PmSettlementAccountStatus::Resolved
1505        } else if self.debt_principal_usdc > Decimal::ZERO {
1506            PmSettlementAccountStatus::Debt
1507        } else {
1508            PmSettlementAccountStatus::Bridged
1509        }
1510    }
1511}
1512
1513fn format_event_key(key: &PmSettlementEventKey) -> String {
1514    format!(
1515        "{}:{}:{}:{}:{}",
1516        key.wallet, key.market_id, key.expiry_ts_ms, key.margin_mode, key.settlement_event_sequence
1517    )
1518}
1519
1520fn validate_request_identity(
1521    request_id: Uuid,
1522    input_digest: &str,
1523    command_type: &str,
1524) -> Result<(), String> {
1525    if request_id.is_nil() {
1526        return Err(format!("{command_type} request_id must not be nil"));
1527    }
1528    if input_digest.trim().is_empty() {
1529        return Err(format!("{command_type} input_digest must be nonempty"));
1530    }
1531    Ok(())
1532}
1533
1534fn reject_nonpositive(field: &'static str, amount: Decimal) -> Result<(), String> {
1535    if amount <= Decimal::ZERO {
1536        Err(format!("{field} must be positive"))
1537    } else {
1538        Ok(())
1539    }
1540}
1541
1542fn reject_negative_amount(field: &'static str, amount: Decimal) -> Result<(), String> {
1543    if amount < Decimal::ZERO {
1544        Err(format!("{field} must be nonnegative"))
1545    } else {
1546        Ok(())
1547    }
1548}
1549
1550fn recovery_action_type(action: &PmRecoveryActionKind) -> &'static str {
1551    match action {
1552        PmRecoveryActionKind::CancelOrder { .. } => "CancelOrder",
1553        PmRecoveryActionKind::ClosePerp { .. } => "ClosePerp",
1554        PmRecoveryActionKind::CloseOption { .. } => "CloseOption",
1555        PmRecoveryActionKind::TransferCollateral { .. } => "TransferCollateral",
1556        PmRecoveryActionKind::EscalateManual { .. } => "EscalateManual",
1557    }
1558}
1559
1560fn recovery_action_target(action: &PmRecoveryActionKind) -> String {
1561    match action {
1562        PmRecoveryActionKind::CancelOrder { order_id, .. } => format!("order:{order_id}"),
1563        PmRecoveryActionKind::ClosePerp { asset, .. } => format!("perp:{asset}"),
1564        PmRecoveryActionKind::CloseOption { market_id, .. } => format!("option:{market_id}"),
1565        PmRecoveryActionKind::TransferCollateral { source, .. } => {
1566            format!("collateral:{source}")
1567        }
1568        PmRecoveryActionKind::EscalateManual { reason } => format!("manual:{reason:?}"),
1569    }
1570}
1571
1572fn recovery_action_payload(action: &PmRecoveryActionKind) -> String {
1573    serde_json::to_string(action).expect("PM recovery action payload serialization must succeed")
1574}
1575
1576fn recovery_initial_action_status(action: &PmRecoveryActionKind) -> &'static str {
1577    match action {
1578        PmRecoveryActionKind::EscalateManual { .. } => "EscalatedManual",
1579        _ => "Planned",
1580    }
1581}
1582
1583fn validate_recovery_action_payload(action: &PmRecoveryActionKind) -> Result<(), String> {
1584    match action {
1585        PmRecoveryActionKind::CancelOrder { .. } | PmRecoveryActionKind::EscalateManual { .. } => {
1586            Ok(())
1587        }
1588        PmRecoveryActionKind::ClosePerp { size, .. } => reject_nonpositive("ClosePerp size", *size),
1589        PmRecoveryActionKind::CloseOption { size, .. } => {
1590            reject_nonpositive("CloseOption size", *size)
1591        }
1592        PmRecoveryActionKind::TransferCollateral { amount_usdc, .. } => {
1593            reject_nonpositive("TransferCollateral amount_usdc", *amount_usdc)
1594        }
1595    }
1596}
1597
1598fn validate_recovery_plan_totals(
1599    plan_expected_usdc_recovered: Decimal,
1600    plan_expected_obligation_reduced: Decimal,
1601    plan_expected_impact_usdc: Decimal,
1602    actions: &[PmRecoveryActionState],
1603) -> Result<(), String> {
1604    let expected_usdc_recovered = actions
1605        .iter()
1606        .map(|action| action.expected_usdc_recovered)
1607        .sum();
1608    if plan_expected_usdc_recovered != expected_usdc_recovered {
1609        return Err(format!(
1610            "PM recovery plan expected_usdc_recovered {} must equal action total {}",
1611            plan_expected_usdc_recovered, expected_usdc_recovered
1612        ));
1613    }
1614    let expected_obligation_reduced = actions
1615        .iter()
1616        .map(|action| action.expected_obligation_reduced)
1617        .sum();
1618    if plan_expected_obligation_reduced != expected_obligation_reduced {
1619        return Err(format!(
1620            "PM recovery plan expected_obligation_reduced {} must equal action total {}",
1621            plan_expected_obligation_reduced, expected_obligation_reduced
1622        ));
1623    }
1624    let expected_impact_usdc = actions
1625        .iter()
1626        .map(|action| action.expected_impact_usdc)
1627        .sum();
1628    if plan_expected_impact_usdc != expected_impact_usdc {
1629        return Err(format!(
1630            "PM recovery plan expected_impact_usdc {} must equal action total {}",
1631            plan_expected_impact_usdc, expected_impact_usdc
1632        ));
1633    }
1634    Ok(())
1635}
1636
1637fn recovery_plan_repaid_by_repayment(
1638    plan: &PmRecoveryPlanState,
1639    account: &PmSettlementAccountState,
1640    interest_paid: Decimal,
1641    debt_paid: Decimal,
1642    bridge_paid: Decimal,
1643) -> bool {
1644    let account_resolves = account.accrued_interest_usdc == interest_paid
1645        && account.debt_principal_usdc == debt_paid
1646        && account.bridge_principal_usdc == bridge_paid;
1647    if account_resolves {
1648        return true;
1649    }
1650    let debt_resolves =
1651        account.debt_principal_usdc > Decimal::ZERO && account.debt_principal_usdc == debt_paid;
1652    let bridge_resolves = account.bridge_principal_usdc > Decimal::ZERO
1653        && account.bridge_principal_usdc == bridge_paid;
1654    let confirmed_recovered_usdc: Decimal = plan
1655        .actions
1656        .iter()
1657        .filter(|action| action.status == "Confirmed")
1658        .map(|action| action.recovered_usdc)
1659        .sum();
1660    let confirmed_repayment_allocated = confirmed_recovered_usdc > Decimal::ZERO
1661        && !recovery_plan_has_progressable_actions(&plan.actions)
1662        && interest_paid + debt_paid + bridge_paid >= confirmed_recovered_usdc;
1663    if confirmed_repayment_allocated {
1664        return true;
1665    }
1666    match (plan.trigger.as_str(), plan.reason.as_str()) {
1667        ("Debt", _) | (_, "SettlementDebt") => debt_resolves,
1668        ("OverdueBridge", _) | (_, "TimingBridge") => bridge_resolves,
1669        _ => false,
1670    }
1671}
1672
1673fn recovery_plan_payload_matches(
1674    existing: &PmRecoveryPlanState,
1675    incoming: &hypercall_engine::command::PmRecoveryPlanCommand,
1676    input_digest: &str,
1677) -> bool {
1678    existing.plan_id == incoming.plan_id
1679        && existing.wallet == incoming.wallet
1680        && existing.underlying == incoming.underlying
1681        && existing.trigger == format!("{:?}", incoming.trigger)
1682        && existing.reason == format!("{:?}", incoming.reason)
1683        && existing.policy_version == incoming.policy_version
1684        && existing.recovery_priority_version == incoming.recovery_priority_version
1685        && existing.input_digest == input_digest
1686        && existing.target_reduction_usdc == incoming.target_reduction_usdc
1687        && existing.expected_usdc_recovered == incoming.expected_usdc_recovered
1688        && existing.expected_obligation_reduced == incoming.expected_obligation_reduced
1689        && existing.expected_impact_usdc == incoming.expected_impact_usdc
1690        && existing.post_plan_utilization == incoming.post_plan_utilization
1691        && recovery_actions_payload_matches(&existing.actions, &incoming.actions)
1692}
1693
1694fn recovery_actions_payload_matches(
1695    existing: &[PmRecoveryActionState],
1696    incoming: &[hypercall_engine::command::PmRecoveryActionCommand],
1697) -> bool {
1698    existing.len() == incoming.len()
1699        && existing
1700            .iter()
1701            .zip(incoming.iter())
1702            .all(|(existing, incoming)| {
1703                existing.action_index == incoming.action_index
1704                    && existing.action_type == recovery_action_type(&incoming.action)
1705                    && existing.target == recovery_action_target(&incoming.action)
1706                    && existing.action_payload == recovery_action_payload(&incoming.action)
1707                    && existing.expected_usdc_recovered == incoming.expected_usdc_recovered
1708                    && existing.expected_obligation_reduced == incoming.expected_obligation_reduced
1709                    && existing.expected_impact_usdc == incoming.expected_impact_usdc
1710            })
1711}
1712
1713fn recovery_result_status(result: PmRecoveryActionResult) -> &'static str {
1714    match result {
1715        PmRecoveryActionResult::Confirmed => "Confirmed",
1716        PmRecoveryActionResult::Failed => "Failed",
1717        PmRecoveryActionResult::Canceled => "Canceled",
1718        PmRecoveryActionResult::TimedOut => "TimedOut",
1719    }
1720}
1721
1722fn recovery_plan_status_after_action_resolution(actions: &[PmRecoveryActionState]) -> String {
1723    if actions.iter().any(|action| action.status == "Submitted") {
1724        return "Submitted".to_string();
1725    }
1726    if actions.iter().any(|action| action.status == "Confirmed")
1727        && actions.iter().any(|action| {
1728            matches!(
1729                action.status.as_str(),
1730                "Planned" | "Failed" | "TimedOut" | "Canceled"
1731            )
1732        })
1733    {
1734        return "PartiallyRepaid".to_string();
1735    }
1736    if actions.iter().all(|action| action.status == "Confirmed") {
1737        return "Confirmed".to_string();
1738    }
1739    if actions.iter().all(|action| action.status == "Canceled") {
1740        return "Canceled".to_string();
1741    }
1742    if actions
1743        .iter()
1744        .all(|action| matches!(action.status.as_str(), "Canceled" | "Failed" | "TimedOut"))
1745    {
1746        return "Failed".to_string();
1747    }
1748    "Planned".to_string()
1749}
1750
1751fn recovery_plan_status_is_terminal(status: &str) -> bool {
1752    matches!(
1753        status,
1754        "Confirmed" | "Canceled" | "EscalatedManual" | "Repaid"
1755    )
1756}
1757
1758fn recovery_plan_has_confirmed_actions(actions: &[PmRecoveryActionState]) -> bool {
1759    actions.iter().any(|action| action.status == "Confirmed")
1760}
1761
1762fn recovery_plan_has_progressable_actions(actions: &[PmRecoveryActionState]) -> bool {
1763    actions.iter().any(|action| {
1764        matches!(
1765            action.status.as_str(),
1766            "Planned" | "Submitted" | "Failed" | "TimedOut"
1767        )
1768    })
1769}
1770
1771fn recovery_plan_blocks_new_plan(plan: &PmRecoveryPlanState) -> bool {
1772    if plan.status == "Repaid" {
1773        return false;
1774    }
1775    recovery_plan_has_confirmed_actions(&plan.actions)
1776        || (!recovery_plan_status_is_terminal(&plan.status)
1777            && recovery_plan_has_progressable_actions(&plan.actions))
1778}
1779
1780fn recovery_plan_releases_active_link(plan: &PmRecoveryPlanState) -> bool {
1781    !recovery_plan_has_confirmed_actions(&plan.actions)
1782        && (recovery_plan_status_is_terminal(&plan.status)
1783            || !recovery_plan_has_progressable_actions(&plan.actions))
1784}
1785
1786fn reject_projection_version(field: &'static str, version: u32) -> Result<(), String> {
1787    if version == 0 {
1788        return Err(format!("{field} must be positive"));
1789    }
1790    if version > i32::MAX as u32 {
1791        return Err(format!(
1792            "{field} must fit in signed 32-bit projection integer"
1793        ));
1794    }
1795    Ok(())
1796}
1797
1798#[cfg(test)]
1799mod tests {
1800    use super::*;
1801    use hypercall_engine::command::{
1802        AccruePmSettlementInterestCommand, ApplyPmSettlementRepaymentCommand,
1803        JournalPmRecoveryPlanCommand, MarkPmRecoveryActionSubmittedCommand,
1804        PmRecoveryActionCommand, PmRecoveryActionKind, PmRecoveryActionResult,
1805        PmRecoveryExternalKind, PmRecoveryPlanCommand, PmRecoveryReason, PmRecoveryTrigger,
1806        PmSettlementEventKey, RecordPmVaultDepositCommand, RequestPmVaultWithdrawalCommand,
1807        ResolvePmRecoveryActionCommand, SetPmSettlementPoolConfigCommand, TickExpiryPmSettlement,
1808    };
1809    use hypercall_margin::portfolio::{PmAccountSettlementFacts, PmSettlementObligation};
1810    use rust_decimal_macros::dec;
1811    use std::str::FromStr;
1812
1813    fn wallet() -> WalletAddress {
1814        WalletAddress::from_str("0x1234567890123456789012345678901234567890")
1815            .expect("valid test wallet")
1816    }
1817
1818    fn config() -> PmSettlementPoolConfig {
1819        PmSettlementPoolConfig {
1820            target_short_oi_notional_multiplier: dec!(0.10),
1821            utilization_kink: dec!(0.60),
1822            apr_at_kink: dec!(0.04),
1823            max_apr: dec!(4.00),
1824            normal_utilization_cap: dec!(0.80),
1825            crisis_utilization_cap: dec!(0.98),
1826            bridge_window_ms: 86_400_000,
1827            policy_version: 1,
1828        }
1829    }
1830
1831    fn request_uuid(offset: u128) -> Uuid {
1832        Uuid::from_u128(0x018f_0000_0000_7000_8000_0000_0000_0000 + offset)
1833    }
1834
1835    fn set_config(request_id: Uuid, version: u32) -> SetPmSettlementPoolConfigCommand {
1836        SetPmSettlementPoolConfigCommand {
1837            request_id,
1838            input_digest: format!("digest-{request_id}"),
1839            underlying: "BTC".to_string(),
1840            config_version: version,
1841            config: config(),
1842            timestamp_ms: 100,
1843        }
1844    }
1845
1846    fn classify_command(
1847        request_id: Uuid,
1848        net_pnl_usdc: Decimal,
1849        liquid_usdc: Decimal,
1850        equity_usdc: Decimal,
1851        maintenance_usdc: Decimal,
1852        recoverable_usdc: Decimal,
1853        pool: &PmSettlementPoolState,
1854    ) -> TickExpiryPmSettlement {
1855        let event_key = PmSettlementEventKey {
1856            wallet: wallet(),
1857            market_id: "BTC-20260630-100000-C".to_string(),
1858            expiry_ts_ms: 1_780_200_000_000,
1859            margin_mode: "portfolio".to_string(),
1860            settlement_event_sequence: 0,
1861        };
1862        let obligation = PmSettlementObligation {
1863            wallet: wallet(),
1864            market_id: event_key.market_id.clone(),
1865            expiry_ts_ms: event_key.expiry_ts_ms,
1866            underlying: "BTC".to_string(),
1867            net_pnl_usdc,
1868            settlement_obligation_usdc: Decimal::ZERO.max(-net_pnl_usdc),
1869        };
1870        TickExpiryPmSettlement {
1871            request_id,
1872            event_key,
1873            wallet: wallet(),
1874            market_id: "BTC-20260630-100000-C".to_string(),
1875            underlying: "BTC".to_string(),
1876            expiry_ts_ms: 1_780_200_000_000,
1877            settlement_obligation_usdc: obligation.settlement_obligation_usdc,
1878            liquid_usdc,
1879            pm_facts: Some(PmAccountSettlementFacts {
1880                wallet: wallet(),
1881                underlying: "BTC".to_string(),
1882                liquid_usdc,
1883                pm_equity_usdc: equity_usdc,
1884                pm_maintenance_requirement_usdc: maintenance_usdc,
1885                recoverable_collateral_usdc: recoverable_usdc,
1886                facts_as_of_ms: 1_780_100_000_000,
1887                stale: false,
1888            }),
1889            pool_snapshot: Some(pool.snapshot()),
1890            policy_version: pool.policy_version,
1891            facts_digest: format!("digest-{request_id}"),
1892            unavailable_reason: None,
1893            obligation,
1894        }
1895    }
1896
1897    fn configured_state(pool_cash: Decimal) -> PmSettlementState {
1898        let mut state = PmSettlementState::default();
1899        state
1900            .apply_set_config(set_config(request_uuid(1), 1))
1901            .unwrap();
1902        state
1903            .set_test_pool_available_usdc("BTC".to_string(), pool_cash, 200)
1904            .unwrap();
1905        state
1906    }
1907
1908    fn vault_deposit_command(
1909        request_id: Uuid,
1910        amount_usdc: Decimal,
1911        max_listed_expiry_ts_ms: i64,
1912        settlement_grace_ms: i64,
1913    ) -> RecordPmVaultDepositCommand {
1914        RecordPmVaultDepositCommand {
1915            request_id,
1916            input_digest: format!("digest-{request_id}"),
1917            depositor: wallet(),
1918            underlying: "BTC".to_string(),
1919            amount_usdc,
1920            chain_id: 42161,
1921            source_contract_address: wallet(),
1922            tx_hash: format!("0xtx{request_id}"),
1923            log_index: 0,
1924            max_listed_expiry_ts_ms,
1925            settlement_grace_ms,
1926            timestamp_ms: 1_000,
1927        }
1928    }
1929
1930    #[test]
1931    fn pm_vault_deposit_locks_until_max_listed_expiry_plus_grace() {
1932        let mut state = configured_state(Decimal::ZERO);
1933        let request_id = request_uuid(20);
1934        let effects = state
1935            .apply_record_vault_deposit(vault_deposit_command(
1936                request_id,
1937                dec!(50_000),
1938                1_800_000_000_000,
1939                86_400_000,
1940            ))
1941            .expect("vault deposit should apply");
1942
1943        let deposit = state
1944            .vault_deposits
1945            .get(&request_id)
1946            .expect("deposit should be durable engine state");
1947        assert_eq!(deposit.lock_until_ms, 1_800_086_400_000);
1948        assert_eq!(deposit.remaining_usdc, dec!(50_000));
1949        assert_eq!(
1950            state.pools.get("BTC").unwrap().pool_available_usdc,
1951            dec!(50_000)
1952        );
1953        assert!(effects.iter().any(|effect| matches!(
1954            effect,
1955            PmSettlementProjectionEffect::VaultDepositUpsert(deposit)
1956                if deposit.deposit_id == request_id
1957        )));
1958    }
1959
1960    #[test]
1961    fn pm_vault_deposit_normalizes_tx_hash_for_source_event_dedupe() {
1962        let mut state = configured_state(Decimal::ZERO);
1963        let first_id = request_uuid(121);
1964        let mut first = vault_deposit_command(first_id, dec!(1_000), 1_800_000_000_000, 0);
1965        first.tx_hash = "  0xAbCdEf  ".to_string();
1966        state
1967            .apply_record_vault_deposit(first)
1968            .expect("first vault deposit should apply");
1969
1970        let stored = state
1971            .vault_deposits
1972            .get(&first_id)
1973            .expect("first deposit should be stored");
1974        assert_eq!(stored.tx_hash, "0xabcdef");
1975
1976        let mut duplicate =
1977            vault_deposit_command(request_uuid(122), dec!(1_000), 1_800_000_000_000, 0);
1978        duplicate.tx_hash = "0xabcdef".to_string();
1979        let error = state
1980            .apply_record_vault_deposit(duplicate)
1981            .expect_err("same source event with normalized tx hash should reject");
1982        assert!(error.contains("source event already recorded"));
1983        assert_eq!(
1984            state.pools.get("BTC").unwrap().pool_available_usdc,
1985            dec!(1_000)
1986        );
1987    }
1988
1989    #[test]
1990    fn pm_vault_deposit_rejects_missing_listed_expiry() {
1991        let mut state = configured_state(Decimal::ZERO);
1992        let error = state
1993            .apply_record_vault_deposit(vault_deposit_command(
1994                request_uuid(21),
1995                dec!(1_000),
1996                0,
1997                86_400_000,
1998            ))
1999            .expect_err("deposit without max listed expiry should fail closed");
2000        assert!(error.contains("positive max listed expiry"));
2001    }
2002
2003    #[test]
2004    fn pm_settlement_state_defaults_vault_maps_for_older_snapshots() {
2005        let state = configured_state(dec!(1_000));
2006        let mut value = serde_json::to_value(&state).expect("serialize settlement state");
2007        let object = value
2008            .as_object_mut()
2009            .expect("settlement state serializes as object");
2010        object.remove("vault_deposits");
2011        object.remove("vault_withdrawals");
2012
2013        let decoded: PmSettlementState =
2014            serde_json::from_value(value).expect("older snapshot should deserialize");
2015        assert!(decoded.vault_deposits.is_empty());
2016        assert!(decoded.vault_withdrawals.is_empty());
2017        assert_eq!(
2018            decoded.pools.get("BTC").unwrap().pool_available_usdc,
2019            dec!(1_000)
2020        );
2021    }
2022
2023    #[test]
2024    fn pm_settlement_state_defaults_recovery_plans_for_older_msgpack_snapshots() {
2025        #[derive(serde::Serialize)]
2026        struct OlderPmSettlementState {
2027            pools: BTreeMap<String, PmSettlementPoolState>,
2028            accounts: BTreeMap<PmSettlementAccountKey, PmSettlementAccountState>,
2029            events: BTreeMap<PmSettlementEventKey, PmSettlementEventState>,
2030            idempotency: BTreeMap<String, PmSettlementRequestRecord>,
2031            vault_deposits: BTreeMap<Uuid, PmVaultDepositState>,
2032            vault_withdrawals: BTreeMap<Uuid, PmVaultWithdrawalState>,
2033        }
2034
2035        let state = configured_state(dec!(1_000));
2036        let older = OlderPmSettlementState {
2037            pools: state.pools,
2038            accounts: state.accounts,
2039            events: state.events,
2040            idempotency: state.idempotency,
2041            vault_deposits: state.vault_deposits,
2042            vault_withdrawals: state.vault_withdrawals,
2043        };
2044
2045        let payload = rmp_serde::to_vec(&older).expect("serialize older PM settlement state");
2046        let decoded: PmSettlementState =
2047            rmp_serde::from_slice(&payload).expect("older msgpack snapshot should deserialize");
2048
2049        assert!(decoded.recovery_plans.is_empty());
2050        assert_eq!(
2051            decoded.pools.get("BTC").unwrap().pool_available_usdc,
2052            dec!(1_000)
2053        );
2054    }
2055
2056    #[test]
2057    fn pm_vault_withdrawal_requires_unlock_and_reserves_capacity() {
2058        let mut state = configured_state(Decimal::ZERO);
2059        let deposit_id = request_uuid(22);
2060        state
2061            .apply_record_vault_deposit(vault_deposit_command(
2062                deposit_id,
2063                dec!(10_000),
2064                1_800_000_000_000,
2065                1_000,
2066            ))
2067            .expect("vault deposit should apply");
2068
2069        let locked_error = state
2070            .apply_request_vault_withdrawal(RequestPmVaultWithdrawalCommand {
2071                request_id: request_uuid(23),
2072                input_digest: "locked-withdrawal".to_string(),
2073                depositor: wallet(),
2074                underlying: "BTC".to_string(),
2075                deposit_id,
2076                amount_usdc: dec!(1_000),
2077                timestamp_ms: 1_800_000_000_999,
2078            })
2079            .expect_err("withdrawal before lock_until should reject");
2080        assert!(locked_error.contains("locked until"));
2081
2082        let effects = state
2083            .apply_request_vault_withdrawal(RequestPmVaultWithdrawalCommand {
2084                request_id: request_uuid(24),
2085                input_digest: "unlocked-withdrawal".to_string(),
2086                depositor: wallet(),
2087                underlying: "BTC".to_string(),
2088                deposit_id,
2089                amount_usdc: dec!(1_000),
2090                timestamp_ms: 1_800_000_001_000,
2091            })
2092            .expect("unlocked withdrawal should reserve capacity");
2093        let deposit = state.vault_deposits.get(&deposit_id).unwrap();
2094        assert_eq!(deposit.remaining_usdc, dec!(9_000));
2095        assert_eq!(deposit.reserved_withdrawal_usdc, dec!(1_000));
2096        assert_eq!(
2097            state.pools.get("BTC").unwrap().pool_available_usdc,
2098            dec!(9_000)
2099        );
2100        assert!(effects.iter().any(|effect| matches!(
2101            effect,
2102            PmSettlementProjectionEffect::VaultWithdrawalUpsert(withdrawal)
2103                if withdrawal.deposit_id == deposit_id
2104        )));
2105    }
2106
2107    #[test]
2108    fn pm_settlement_event_state_rejects_missing_projection_fields() {
2109        let event = PmSettlementEventState {
2110            event_key: PmSettlementEventKey {
2111                wallet: wallet(),
2112                market_id: "BTC-20260630-100000-C".to_string(),
2113                expiry_ts_ms: 1_780_200_000_000,
2114                margin_mode: "portfolio".to_string(),
2115                settlement_event_sequence: 0,
2116            },
2117            input_digest: "input-digest".to_string(),
2118            output_digest: "output-digest".to_string(),
2119            status: "Paid".to_string(),
2120            request_id: request_uuid(10),
2121            underlying: "BTC".to_string(),
2122            event_type: "settlement_classified".to_string(),
2123            amount_usdc: dec!(123),
2124        };
2125        for field in ["underlying", "event_type", "amount_usdc"] {
2126            let mut value = serde_json::to_value(&event).expect("serialize event");
2127            value
2128                .as_object_mut()
2129                .expect("event serializes as object")
2130                .remove(field);
2131
2132            let error = serde_json::from_value::<PmSettlementEventState>(value)
2133                .expect_err("missing persisted event field must fail deserialization");
2134            assert!(
2135                error.to_string().contains(field),
2136                "missing {field} should be named in error: {error}"
2137            );
2138        }
2139    }
2140
2141    #[test]
2142    fn pm_recovery_plan_state_rejects_missing_identity_fields() {
2143        let plan = PmRecoveryPlanState {
2144            plan_id: "plan-1".to_string(),
2145            wallet: wallet(),
2146            underlying: "BTC".to_string(),
2147            trigger: "settlement_debt".to_string(),
2148            reason: "test".to_string(),
2149            policy_version: 1,
2150            recovery_priority_version: 1,
2151            status: "Pending".to_string(),
2152            input_digest: "digest".to_string(),
2153            target_reduction_usdc: dec!(1),
2154            expected_usdc_recovered: dec!(1),
2155            expected_obligation_reduced: dec!(1),
2156            expected_impact_usdc: Decimal::ZERO,
2157            post_plan_utilization: Some(dec!(0.1)),
2158            actions: vec![PmRecoveryActionState {
2159                action_index: 0,
2160                action_type: "notify".to_string(),
2161                target: "manual_recovery".to_string(),
2162                action_payload: "notify:manual_recovery".to_string(),
2163                status: "Planned".to_string(),
2164                expected_usdc_recovered: Decimal::ZERO,
2165                expected_obligation_reduced: Decimal::ZERO,
2166                expected_impact_usdc: Decimal::ZERO,
2167                attempt: 0,
2168                submitted_external_id: None,
2169                external_kind: None,
2170                result: None,
2171                result_external_id: None,
2172                recovered_usdc: Decimal::ZERO,
2173                liability_reduction_usdc: Decimal::ZERO,
2174                updated_at_ms: 1,
2175            }],
2176            updated_at_ms: 1,
2177        };
2178        for field in ["wallet", "underlying"] {
2179            let mut value = serde_json::to_value(&plan).expect("serialize recovery plan");
2180            value
2181                .as_object_mut()
2182                .expect("recovery plan serializes as object")
2183                .remove(field);
2184
2185            let error = serde_json::from_value::<PmRecoveryPlanState>(value)
2186                .expect_err("missing persisted recovery identity field must fail deserialization");
2187            assert!(
2188                error.to_string().contains(field),
2189                "missing {field} should be named in error: {error}"
2190            );
2191        }
2192    }
2193
2194    #[test]
2195    fn config_version_must_increase() {
2196        let mut state = PmSettlementState::default();
2197        state
2198            .apply_set_config(set_config(request_uuid(1), 1))
2199            .unwrap();
2200
2201        let error = state
2202            .apply_set_config(set_config(request_uuid(2), 1))
2203            .expect_err("same config version should reject");
2204        assert!(error.contains("config_version must increase"));
2205    }
2206
2207    #[test]
2208    fn config_versions_must_fit_projection_integer_range() {
2209        let mut state = PmSettlementState::default();
2210
2211        let config_version_error = state
2212            .apply_set_config(set_config(request_uuid(1), i32::MAX as u32 + 1))
2213            .expect_err("oversized config version should reject before pool mutation");
2214        assert!(config_version_error.contains("config_version"));
2215        assert!(state.pools.is_empty());
2216
2217        let mut oversized_policy_config = config();
2218        oversized_policy_config.policy_version = i32::MAX as u32 + 1;
2219        let policy_version_error = state
2220            .apply_set_config(SetPmSettlementPoolConfigCommand {
2221                request_id: request_uuid(2),
2222                input_digest: "oversized-policy-version".to_string(),
2223                underlying: "BTC".to_string(),
2224                config_version: 1,
2225                config: oversized_policy_config,
2226                timestamp_ms: 100,
2227            })
2228            .expect_err("oversized policy version should reject before pool mutation");
2229        assert!(policy_version_error.contains("policy_version"));
2230        assert!(state.pools.is_empty());
2231    }
2232
2233    #[test]
2234    fn duplicate_request_with_mismatched_digest_rejects() {
2235        let mut state = PmSettlementState::default();
2236        let request_id = request_uuid(1);
2237        state.apply_set_config(set_config(request_id, 1)).unwrap();
2238
2239        let error = state
2240            .apply_set_config(SetPmSettlementPoolConfigCommand {
2241                input_digest: "different".to_string(),
2242                config_version: 2,
2243                ..set_config(request_id, 2)
2244            })
2245            .expect_err("duplicate request with different digest should reject");
2246
2247        assert!(error.contains("idempotency mismatch"));
2248    }
2249
2250    #[test]
2251    fn classify_settlement_opens_timing_bridge_and_debits_pool() {
2252        let mut state = configured_state(dec!(10_000));
2253        let pool = state.pools.get("BTC").unwrap().clone();
2254        let command = classify_command(
2255            request_uuid(10),
2256            dec!(-1_000),
2257            dec!(100),
2258            dec!(10_000),
2259            dec!(5_000),
2260            dec!(1_000),
2261            &pool,
2262        );
2263
2264        let effects = state
2265            .apply_classify_settlement(command, 1_780_200_001_000)
2266            .expect("bridge classification should apply");
2267
2268        let pool = state.pools.get("BTC").unwrap();
2269        assert_eq!(pool.pool_available_usdc, dec!(9_100));
2270        assert_eq!(pool.active_timing_bridge_usdc, dec!(900));
2271        let account = state
2272            .accounts
2273            .get(&PmSettlementAccountKey {
2274                wallet: wallet(),
2275                underlying: "BTC".to_string(),
2276            })
2277            .unwrap();
2278        assert_eq!(account.bridge_principal_usdc, dec!(900));
2279        assert_eq!(account.status, PmSettlementAccountStatus::Bridged);
2280        assert_eq!(
2281            account.bridge_deadline_ms,
2282            Some(1_780_200_001_000_i64 + config().bridge_window_ms)
2283        );
2284        assert!(effects.iter().any(|effect| matches!(
2285            effect,
2286            PmSettlementProjectionEffect::EventUpsert(event)
2287                if event.status == "TimingBridge"
2288        )));
2289    }
2290
2291    #[test]
2292    fn classify_settlement_debt_keeps_existing_active_recovery_plan() {
2293        let mut state = configured_state(dec!(10_000));
2294        let active_plan_id = "pm-recovery-existing-active";
2295        state.accounts.insert(
2296            PmSettlementAccountKey {
2297                wallet: wallet(),
2298                underlying: "BTC".to_string(),
2299            },
2300            PmSettlementAccountState {
2301                wallet: wallet(),
2302                underlying: "BTC".to_string(),
2303                bridge_principal_usdc: Decimal::ZERO,
2304                debt_principal_usdc: dec!(100),
2305                accrued_interest_usdc: Decimal::ZERO,
2306                last_interest_accrual_ms: 1_000,
2307                policy_version: 1,
2308                bridge_deadline_ms: None,
2309                status: PmSettlementAccountStatus::Debt,
2310                active_recovery_plan_id: None,
2311                updated_at_ms: 1_000,
2312            },
2313        );
2314        state
2315            .apply_journal_recovery_plan(JournalPmRecoveryPlanCommand {
2316                request_id: request_uuid(25),
2317                input_digest: "existing-plan-digest".to_string(),
2318                plan: recovery_plan(active_plan_id),
2319                timestamp_ms: 1_100,
2320            })
2321            .unwrap();
2322        let pool = state.pools.get("BTC").unwrap().clone();
2323        let command = classify_command(
2324            request_uuid(26),
2325            dec!(-1_000),
2326            dec!(100),
2327            dec!(10_000),
2328            dec!(5_000),
2329            dec!(100),
2330            &pool,
2331        );
2332        let auto_plan_id = format!("pm-recovery-{}", command.request_id);
2333
2334        let effects = state
2335            .apply_classify_settlement(command, 1_780_200_001_000)
2336            .expect("debt classification should apply");
2337
2338        let account = state
2339            .accounts
2340            .get(&PmSettlementAccountKey {
2341                wallet: wallet(),
2342                underlying: "BTC".to_string(),
2343            })
2344            .unwrap();
2345        assert_eq!(
2346            account.active_recovery_plan_id.as_deref(),
2347            Some(active_plan_id)
2348        );
2349        assert!(!state.recovery_plans.contains_key(&auto_plan_id));
2350        assert!(!effects.iter().any(|effect| matches!(
2351            effect,
2352            PmSettlementProjectionEffect::RecoveryPlanUpsert(plan)
2353                if plan.plan_id == auto_plan_id
2354        )));
2355    }
2356
2357    #[test]
2358    fn classify_settlement_rejects_liquid_usdc_mismatch() {
2359        let mut state = configured_state(dec!(10_000));
2360        let before = state.clone();
2361        let pool = state.pools.get("BTC").unwrap().clone();
2362        let mut command = classify_command(
2363            request_uuid(30),
2364            dec!(-1_000),
2365            dec!(100),
2366            dec!(10_000),
2367            dec!(5_000),
2368            dec!(1_000),
2369            &pool,
2370        );
2371        command
2372            .pm_facts
2373            .as_mut()
2374            .expect("test command includes PM facts")
2375            .liquid_usdc = dec!(1_000);
2376
2377        let error = state
2378            .apply_classify_settlement(command, 1_780_200_001_000)
2379            .expect_err("liquid mismatch should reject before classification");
2380
2381        assert!(error.contains("liquid_usdc mismatch"));
2382        assert_eq!(state, before);
2383    }
2384
2385    #[test]
2386    fn classify_settlement_rejects_pm_facts_wallet_mismatch() {
2387        let mut state = configured_state(dec!(10_000));
2388        let before = state.clone();
2389        let pool = state.pools.get("BTC").unwrap().clone();
2390        let mut command = classify_command(
2391            request_uuid(31),
2392            dec!(-1_000),
2393            dec!(100),
2394            dec!(10_000),
2395            dec!(5_000),
2396            dec!(1_000),
2397            &pool,
2398        );
2399        command
2400            .pm_facts
2401            .as_mut()
2402            .expect("test command includes PM facts")
2403            .wallet = WalletAddress::from_str("0x2222222222222222222222222222222222222222")
2404            .expect("valid test wallet");
2405
2406        let error = state
2407            .apply_classify_settlement(command, 1_780_200_001_000)
2408            .expect_err("wallet mismatch should reject before classification");
2409
2410        assert!(error.contains("PM facts wallet mismatch"));
2411        assert_eq!(state, before);
2412    }
2413
2414    #[test]
2415    fn classify_settlement_rejects_pm_facts_underlying_mismatch() {
2416        let mut state = configured_state(dec!(10_000));
2417        let before = state.clone();
2418        let pool = state.pools.get("BTC").unwrap().clone();
2419        let mut command = classify_command(
2420            request_uuid(32),
2421            dec!(-1_000),
2422            dec!(100),
2423            dec!(10_000),
2424            dec!(5_000),
2425            dec!(1_000),
2426            &pool,
2427        );
2428        command
2429            .pm_facts
2430            .as_mut()
2431            .expect("test command includes PM facts")
2432            .underlying = "ETH".to_string();
2433
2434        let error = state
2435            .apply_classify_settlement(command, 1_780_200_001_000)
2436            .expect_err("underlying mismatch should reject before classification");
2437
2438        assert!(error.contains("PM facts underlying mismatch"));
2439        assert_eq!(state, before);
2440    }
2441
2442    #[test]
2443    fn classify_settlement_pool_shortfall_is_unavailable_without_mutating_liability() {
2444        let mut state = configured_state(dec!(500));
2445        let before = state.clone();
2446        let pool = state.pools.get("BTC").unwrap().clone();
2447        let command = classify_command(
2448            request_uuid(11),
2449            dec!(-1_000),
2450            dec!(100),
2451            dec!(10_000),
2452            dec!(5_000),
2453            dec!(1_000),
2454            &pool,
2455        );
2456        let event_key = command.event_key.clone();
2457
2458        let effects = state
2459            .apply_classify_settlement(command, 1_780_200_001_000)
2460            .expect("shortfall should produce unavailable event");
2461
2462        assert_eq!(state.pools, before.pools);
2463        assert!(state.accounts.is_empty());
2464        assert!(effects.iter().any(|effect| matches!(
2465            effect,
2466            PmSettlementProjectionEffect::EventUpsert(event)
2467                if event.status == "Unavailable"
2468        )));
2469        assert!(!state.events.contains_key(&event_key));
2470    }
2471
2472    #[test]
2473    fn classify_settlement_unavailable_does_not_burn_retry_event_key() {
2474        let mut state = configured_state(dec!(500));
2475        let pool = state.pools.get("BTC").unwrap().clone();
2476        let unavailable = classify_command(
2477            request_uuid(12),
2478            dec!(-1_000),
2479            dec!(100),
2480            dec!(10_000),
2481            dec!(5_000),
2482            dec!(1_000),
2483            &pool,
2484        );
2485        let event_key = unavailable.event_key.clone();
2486
2487        state
2488            .apply_classify_settlement(unavailable, 1_780_200_001_000)
2489            .expect("pool shortfall should emit unavailable event");
2490        assert!(!state.events.contains_key(&event_key));
2491
2492        state
2493            .set_test_pool_available_usdc("BTC".to_string(), dec!(10_000), 200)
2494            .unwrap();
2495        let pool = state.pools.get("BTC").unwrap().clone();
2496        let mut retry = classify_command(
2497            request_uuid(13),
2498            dec!(-1_000),
2499            dec!(100),
2500            dec!(10_000),
2501            dec!(5_000),
2502            dec!(1_000),
2503            &pool,
2504        );
2505        retry.event_key = event_key.clone();
2506        retry.facts_digest = "digest-classify-after-unavailable".to_string();
2507
2508        state
2509            .apply_classify_settlement(retry, 1_780_200_002_000)
2510            .expect("fresh retry should classify after unavailable event");
2511        assert!(state.events.contains_key(&event_key));
2512        assert_eq!(
2513            state.pools.get("BTC").unwrap().active_timing_bridge_usdc,
2514            dec!(900)
2515        );
2516    }
2517
2518    #[test]
2519    fn classify_settlement_duplicate_same_digest_is_idempotent() {
2520        let mut state = configured_state(dec!(10_000));
2521        let pool = state.pools.get("BTC").unwrap().clone();
2522        let command = classify_command(
2523            request_uuid(14),
2524            dec!(-1_000),
2525            dec!(100),
2526            dec!(10_000),
2527            dec!(5_000),
2528            dec!(1_000),
2529            &pool,
2530        );
2531
2532        state
2533            .apply_classify_settlement(command.clone(), 1_780_200_001_000)
2534            .unwrap();
2535        let effects = state
2536            .apply_classify_settlement(command, 1_780_200_001_000)
2537            .unwrap();
2538
2539        assert!(effects.is_empty());
2540        assert_eq!(
2541            state.pools.get("BTC").unwrap().active_timing_bridge_usdc,
2542            dec!(900)
2543        );
2544    }
2545
2546    #[test]
2547    fn classify_settlement_duplicate_event_key_with_changed_digest_rejects() {
2548        let mut state = configured_state(dec!(10_000));
2549        let pool = state.pools.get("BTC").unwrap().clone();
2550        let command = classify_command(
2551            request_uuid(15),
2552            dec!(-1_000),
2553            dec!(100),
2554            dec!(10_000),
2555            dec!(5_000),
2556            dec!(1_000),
2557            &pool,
2558        );
2559        state
2560            .apply_classify_settlement(command.clone(), 1_780_200_001_000)
2561            .unwrap();
2562
2563        let mut changed = classify_command(
2564            request_uuid(16),
2565            dec!(-1_000),
2566            dec!(100),
2567            dec!(10_000),
2568            dec!(5_000),
2569            dec!(1_000),
2570            &state.pools.get("BTC").unwrap().clone(),
2571        );
2572        changed.event_key = command.event_key;
2573        changed.facts_digest = "different-digest".to_string();
2574
2575        let error = state
2576            .apply_classify_settlement(changed, 1_780_200_001_000)
2577            .expect_err("same event key with changed digest must reject");
2578        assert!(error.contains("event digest mismatch"));
2579    }
2580
2581    #[test]
2582    fn interest_rejects_backwards_cursor() {
2583        let mut state = configured_state(dec!(1_000));
2584        let key = PmSettlementAccountKey {
2585            wallet: wallet(),
2586            underlying: "BTC".to_string(),
2587        };
2588        state.accounts.insert(
2589            key,
2590            PmSettlementAccountState {
2591                wallet: wallet(),
2592                underlying: "BTC".to_string(),
2593                bridge_principal_usdc: dec!(100),
2594                debt_principal_usdc: Decimal::ZERO,
2595                accrued_interest_usdc: Decimal::ZERO,
2596                last_interest_accrual_ms: 1000,
2597                policy_version: 1,
2598                bridge_deadline_ms: None,
2599                status: PmSettlementAccountStatus::Bridged,
2600                active_recovery_plan_id: None,
2601                updated_at_ms: 1000,
2602            },
2603        );
2604
2605        let error = state
2606            .apply_accrue_interest(AccruePmSettlementInterestCommand {
2607                request_id: request_uuid(3),
2608                input_digest: "interest-digest".to_string(),
2609                wallet: wallet(),
2610                underlying: "BTC".to_string(),
2611                to_ms: 999,
2612                timestamp_ms: 1100,
2613            })
2614            .expect_err("backwards cursor should reject");
2615
2616        assert!(error.contains("accrual end"));
2617    }
2618
2619    #[test]
2620    fn interest_rejects_future_dated_window() {
2621        let mut state = configured_state(dec!(1_000));
2622        let key = PmSettlementAccountKey {
2623            wallet: wallet(),
2624            underlying: "BTC".to_string(),
2625        };
2626        state.accounts.insert(
2627            key,
2628            PmSettlementAccountState {
2629                wallet: wallet(),
2630                underlying: "BTC".to_string(),
2631                bridge_principal_usdc: dec!(100),
2632                debt_principal_usdc: Decimal::ZERO,
2633                accrued_interest_usdc: Decimal::ZERO,
2634                last_interest_accrual_ms: 1000,
2635                policy_version: 1,
2636                bridge_deadline_ms: None,
2637                status: PmSettlementAccountStatus::Bridged,
2638                active_recovery_plan_id: None,
2639                updated_at_ms: 1000,
2640            },
2641        );
2642
2643        let error = state
2644            .apply_accrue_interest(AccruePmSettlementInterestCommand {
2645                request_id: request_uuid(4),
2646                input_digest: "future-interest-digest".to_string(),
2647                wallet: wallet(),
2648                underlying: "BTC".to_string(),
2649                to_ms: 1200,
2650                timestamp_ms: 1100,
2651            })
2652            .expect_err("future-dated accrual window should reject");
2653
2654        assert!(error.contains("after command timestamp"));
2655    }
2656
2657    #[test]
2658    fn interest_derives_policy_utilization_apr_and_amount_from_pool_state() {
2659        let mut state = configured_state(dec!(10_000));
2660        {
2661            let pool = state.pools.get_mut("BTC").unwrap();
2662            pool.pool_available_usdc = dec!(7_500);
2663            pool.active_timing_bridge_usdc = dec!(2_500);
2664            pool.recompute_utilization().unwrap();
2665        }
2666        let key = PmSettlementAccountKey {
2667            wallet: wallet(),
2668            underlying: "BTC".to_string(),
2669        };
2670        state.accounts.insert(
2671            key.clone(),
2672            PmSettlementAccountState {
2673                wallet: wallet(),
2674                underlying: "BTC".to_string(),
2675                bridge_principal_usdc: dec!(100),
2676                debt_principal_usdc: Decimal::ZERO,
2677                accrued_interest_usdc: Decimal::ZERO,
2678                last_interest_accrual_ms: 0,
2679                policy_version: 1,
2680                bridge_deadline_ms: None,
2681                status: PmSettlementAccountStatus::Bridged,
2682                active_recovery_plan_id: None,
2683                updated_at_ms: 0,
2684            },
2685        );
2686
2687        let effects = state
2688            .apply_accrue_interest(AccruePmSettlementInterestCommand {
2689                request_id: request_uuid(3),
2690                input_digest: "interest-derived-digest".to_string(),
2691                wallet: wallet(),
2692                underlying: "BTC".to_string(),
2693                to_ms: INTEREST_YEAR_MS as i64,
2694                timestamp_ms: INTEREST_YEAR_MS,
2695            })
2696            .expect("engine-derived interest should accrue");
2697
2698        let account = state.accounts.get(&key).unwrap();
2699        assert_eq!(account.policy_version, 1);
2700        assert_eq!(account.last_interest_accrual_ms, INTEREST_YEAR_MS as i64);
2701        assert_eq!(
2702            account.accrued_interest_usdc,
2703            dec!(100) * dec!(0.0166666666666666666666666667)
2704        );
2705        assert!(effects.iter().any(|effect| matches!(
2706            effect,
2707            PmSettlementProjectionEffect::InterestEvent(event)
2708                if event.utilization == dec!(0.25)
2709                    && event.apr == dec!(0.0166666666666666666666666667)
2710                    && event.interest_amount_usdc == account.accrued_interest_usdc
2711                    && event.policy_version == 1
2712        )));
2713        assert!(effects.iter().any(|effect| matches!(
2714            effect,
2715            PmSettlementProjectionEffect::AccountUpsert(account)
2716                if account.policy_version == 1
2717        )));
2718    }
2719
2720    #[test]
2721    fn repayment_pays_interest_before_principal_and_replenishes_pool_for_principal() {
2722        let mut state = PmSettlementState::default();
2723        state
2724            .apply_set_config(set_config(request_uuid(1), 1))
2725            .unwrap();
2726        state
2727            .set_test_pool_available_usdc("BTC".to_string(), dec!(500), 200)
2728            .unwrap();
2729        let pool = state.pools.get_mut("BTC").unwrap();
2730        pool.active_timing_bridge_usdc = dec!(100);
2731        pool.active_settlement_debt_usdc = dec!(50);
2732        pool.recompute_utilization().unwrap();
2733        let key = PmSettlementAccountKey {
2734            wallet: wallet(),
2735            underlying: "BTC".to_string(),
2736        };
2737        state.accounts.insert(
2738            key.clone(),
2739            PmSettlementAccountState {
2740                wallet: wallet(),
2741                underlying: "BTC".to_string(),
2742                bridge_principal_usdc: dec!(100),
2743                debt_principal_usdc: dec!(50),
2744                accrued_interest_usdc: dec!(10),
2745                last_interest_accrual_ms: 1000,
2746                policy_version: 1,
2747                bridge_deadline_ms: None,
2748                status: PmSettlementAccountStatus::Debt,
2749                active_recovery_plan_id: None,
2750                updated_at_ms: 1000,
2751            },
2752        );
2753
2754        state
2755            .apply_repayment(ApplyPmSettlementRepaymentCommand {
2756                request_id: request_uuid(3),
2757                input_digest: "repay-digest".to_string(),
2758                wallet: wallet(),
2759                underlying: "BTC".to_string(),
2760                amount_usdc: dec!(80),
2761                reason: "admin_test".to_string(),
2762                source_event_id: "source-1".to_string(),
2763                timestamp_ms: 1200,
2764            })
2765            .unwrap();
2766
2767        let account = state.accounts.get(&key).unwrap();
2768        assert_eq!(account.accrued_interest_usdc, Decimal::ZERO);
2769        assert_eq!(account.bridge_principal_usdc, dec!(80));
2770        assert_eq!(account.debt_principal_usdc, Decimal::ZERO);
2771        assert_eq!(
2772            state.pools.get("BTC").unwrap().pool_available_usdc,
2773            dec!(570)
2774        );
2775    }
2776
2777    #[test]
2778    fn repayment_rejects_overpayment_without_mutating_state() {
2779        let mut state = PmSettlementState::default();
2780        state
2781            .apply_set_config(set_config(request_uuid(1), 1))
2782            .unwrap();
2783        state
2784            .set_test_pool_available_usdc("BTC".to_string(), dec!(500), 200)
2785            .unwrap();
2786        let pool = state.pools.get_mut("BTC").unwrap();
2787        pool.active_timing_bridge_usdc = dec!(100);
2788        pool.recompute_utilization().unwrap();
2789        let key = PmSettlementAccountKey {
2790            wallet: wallet(),
2791            underlying: "BTC".to_string(),
2792        };
2793        state.accounts.insert(
2794            key.clone(),
2795            PmSettlementAccountState {
2796                wallet: wallet(),
2797                underlying: "BTC".to_string(),
2798                bridge_principal_usdc: dec!(100),
2799                debt_principal_usdc: Decimal::ZERO,
2800                accrued_interest_usdc: dec!(10),
2801                last_interest_accrual_ms: 1000,
2802                policy_version: 1,
2803                bridge_deadline_ms: None,
2804                status: PmSettlementAccountStatus::Bridged,
2805                active_recovery_plan_id: None,
2806                updated_at_ms: 1000,
2807            },
2808        );
2809        let before = state.clone();
2810
2811        let error = state
2812            .apply_repayment(ApplyPmSettlementRepaymentCommand {
2813                request_id: request_uuid(3),
2814                input_digest: "repay-too-much-digest".to_string(),
2815                wallet: wallet(),
2816                underlying: "BTC".to_string(),
2817                amount_usdc: dec!(111),
2818                reason: "admin_test".to_string(),
2819                source_event_id: "source-1".to_string(),
2820                timestamp_ms: 1200,
2821            })
2822            .expect_err("overpayment must reject");
2823
2824        assert!(error.contains("exceeds outstanding"));
2825        assert_eq!(state, before);
2826        assert!(!state.idempotency.contains_key(&request_uuid(3).to_string()));
2827    }
2828
2829    #[test]
2830    fn repayment_resolves_active_recovery_plan() {
2831        let mut state = configured_state(dec!(500));
2832        let key = PmSettlementAccountKey {
2833            wallet: wallet(),
2834            underlying: "BTC".to_string(),
2835        };
2836        state.accounts.insert(
2837            key.clone(),
2838            PmSettlementAccountState {
2839                wallet: wallet(),
2840                underlying: "BTC".to_string(),
2841                bridge_principal_usdc: Decimal::ZERO,
2842                debt_principal_usdc: dec!(100),
2843                accrued_interest_usdc: dec!(5),
2844                last_interest_accrual_ms: 1000,
2845                policy_version: 1,
2846                bridge_deadline_ms: None,
2847                status: PmSettlementAccountStatus::Debt,
2848                active_recovery_plan_id: None,
2849                updated_at_ms: 1000,
2850            },
2851        );
2852        let pool = state.pools.get_mut("BTC").unwrap();
2853        pool.active_settlement_debt_usdc = dec!(100);
2854        pool.recompute_utilization().unwrap();
2855        let plan_id = "pm-recovery-repaid";
2856        state
2857            .apply_journal_recovery_plan(JournalPmRecoveryPlanCommand {
2858                request_id: request_uuid(20),
2859                input_digest: "plan-digest".to_string(),
2860                plan: recovery_plan(plan_id),
2861                timestamp_ms: 1_100,
2862            })
2863            .unwrap();
2864
2865        let effects = state
2866            .apply_repayment(ApplyPmSettlementRepaymentCommand {
2867                request_id: request_uuid(21),
2868                input_digest: "repay-active-plan-digest".to_string(),
2869                wallet: wallet(),
2870                underlying: "BTC".to_string(),
2871                amount_usdc: dec!(105),
2872                reason: "admin_test".to_string(),
2873                source_event_id: "source-1".to_string(),
2874                timestamp_ms: 1_200,
2875            })
2876            .expect("full repayment should resolve active recovery plan");
2877
2878        let account = state.accounts.get(&key).unwrap();
2879        assert_eq!(account.status, PmSettlementAccountStatus::Resolved);
2880        assert_eq!(account.active_recovery_plan_id, None);
2881        assert_eq!(state.recovery_plans.get(plan_id).unwrap().status, "Repaid");
2882        assert!(effects.iter().any(|effect| matches!(
2883            effect,
2884            PmSettlementProjectionEffect::RecoveryPlanUpsert(plan)
2885                if plan.plan_id == plan_id && plan.status == "Repaid"
2886        )));
2887
2888        let before_late_submit = state.clone();
2889        let error = state
2890            .apply_mark_recovery_action_submitted(MarkPmRecoveryActionSubmittedCommand {
2891                request_id: request_uuid(22),
2892                input_digest: "late-submit-after-repaid-digest".to_string(),
2893                wallet: wallet(),
2894                plan_id: plan_id.to_string(),
2895                action_index: 0,
2896                attempt: 1,
2897                external_id: "late-directive".to_string(),
2898                external_kind: PmRecoveryExternalKind::Directive,
2899                timestamp_ms: 1_300,
2900            })
2901            .expect_err("repaid plans must reject late submissions");
2902
2903        assert!(error.contains("terminal plan"));
2904        assert_eq!(state, before_late_submit);
2905    }
2906
2907    #[test]
2908    fn repayment_retires_debt_recovery_plan_when_bridge_remains() {
2909        let mut state = configured_state(dec!(500));
2910        let key = PmSettlementAccountKey {
2911            wallet: wallet(),
2912            underlying: "BTC".to_string(),
2913        };
2914        state.accounts.insert(
2915            key.clone(),
2916            PmSettlementAccountState {
2917                wallet: wallet(),
2918                underlying: "BTC".to_string(),
2919                bridge_principal_usdc: dec!(80),
2920                debt_principal_usdc: dec!(50),
2921                accrued_interest_usdc: Decimal::ZERO,
2922                last_interest_accrual_ms: 1000,
2923                policy_version: 1,
2924                bridge_deadline_ms: None,
2925                status: PmSettlementAccountStatus::Debt,
2926                active_recovery_plan_id: None,
2927                updated_at_ms: 1000,
2928            },
2929        );
2930        let pool = state.pools.get_mut("BTC").unwrap();
2931        pool.active_timing_bridge_usdc = dec!(80);
2932        pool.active_settlement_debt_usdc = dec!(50);
2933        pool.recompute_utilization().unwrap();
2934        let plan_id = "pm-recovery-debt-repaid";
2935        state
2936            .apply_journal_recovery_plan(JournalPmRecoveryPlanCommand {
2937                request_id: request_uuid(23),
2938                input_digest: "plan-digest".to_string(),
2939                plan: recovery_plan(plan_id),
2940                timestamp_ms: 1_100,
2941            })
2942            .unwrap();
2943
2944        let effects = state
2945            .apply_repayment(ApplyPmSettlementRepaymentCommand {
2946                request_id: request_uuid(24),
2947                input_digest: "repay-debt-only-digest".to_string(),
2948                wallet: wallet(),
2949                underlying: "BTC".to_string(),
2950                amount_usdc: dec!(50),
2951                reason: "admin_test".to_string(),
2952                source_event_id: "source-1".to_string(),
2953                timestamp_ms: 1_200,
2954            })
2955            .expect("debt repayment should retire debt recovery plan");
2956
2957        let account = state.accounts.get(&key).unwrap();
2958        assert_eq!(account.status, PmSettlementAccountStatus::Bridged);
2959        assert_eq!(account.debt_principal_usdc, Decimal::ZERO);
2960        assert_eq!(account.bridge_principal_usdc, dec!(80));
2961        assert_eq!(account.active_recovery_plan_id, None);
2962        assert_eq!(state.recovery_plans.get(plan_id).unwrap().status, "Repaid");
2963        assert!(effects.iter().any(|effect| matches!(
2964            effect,
2965            PmSettlementProjectionEffect::RecoveryPlanUpsert(plan)
2966                if plan.plan_id == plan_id && plan.status == "Repaid"
2967        )));
2968    }
2969
2970    #[test]
2971    fn repaid_recovery_plan_rejects_late_resolution() {
2972        let mut state = configured_state(dec!(500));
2973        let key = PmSettlementAccountKey {
2974            wallet: wallet(),
2975            underlying: "BTC".to_string(),
2976        };
2977        state.accounts.insert(
2978            key,
2979            PmSettlementAccountState {
2980                wallet: wallet(),
2981                underlying: "BTC".to_string(),
2982                bridge_principal_usdc: Decimal::ZERO,
2983                debt_principal_usdc: dec!(100),
2984                accrued_interest_usdc: Decimal::ZERO,
2985                last_interest_accrual_ms: 1000,
2986                policy_version: 1,
2987                bridge_deadline_ms: None,
2988                status: PmSettlementAccountStatus::Debt,
2989                active_recovery_plan_id: None,
2990                updated_at_ms: 1000,
2991            },
2992        );
2993        let pool = state.pools.get_mut("BTC").unwrap();
2994        pool.active_settlement_debt_usdc = dec!(100);
2995        pool.recompute_utilization().unwrap();
2996        let plan_id = "pm-recovery-late-resolution";
2997        state
2998            .apply_journal_recovery_plan(JournalPmRecoveryPlanCommand {
2999                request_id: request_uuid(27),
3000                input_digest: "plan-digest".to_string(),
3001                plan: recovery_plan(plan_id),
3002                timestamp_ms: 1_100,
3003            })
3004            .unwrap();
3005        state
3006            .apply_mark_recovery_action_submitted(MarkPmRecoveryActionSubmittedCommand {
3007                request_id: request_uuid(28),
3008                input_digest: "submit-digest".to_string(),
3009                wallet: wallet(),
3010                plan_id: plan_id.to_string(),
3011                action_index: 0,
3012                attempt: 1,
3013                external_id: "directive-late-resolution".to_string(),
3014                external_kind: PmRecoveryExternalKind::Directive,
3015                timestamp_ms: 1_150,
3016            })
3017            .unwrap();
3018        state
3019            .apply_repayment(ApplyPmSettlementRepaymentCommand {
3020                request_id: request_uuid(29),
3021                input_digest: "repay-before-resolution-digest".to_string(),
3022                wallet: wallet(),
3023                underlying: "BTC".to_string(),
3024                amount_usdc: dec!(100),
3025                reason: "admin_test".to_string(),
3026                source_event_id: "source-1".to_string(),
3027                timestamp_ms: 1_200,
3028            })
3029            .unwrap();
3030        let before_late_resolution = state.clone();
3031
3032        let error = state
3033            .apply_resolve_recovery_action(ResolvePmRecoveryActionCommand {
3034                request_id: request_uuid(30),
3035                input_digest: "late-resolution-digest".to_string(),
3036                wallet: wallet(),
3037                plan_id: plan_id.to_string(),
3038                action_index: 0,
3039                attempt: 1,
3040                result: PmRecoveryActionResult::Confirmed,
3041                recovered_usdc: dec!(100),
3042                liability_reduction_usdc: dec!(100),
3043                result_external_id: Some("directive-late-resolution".to_string()),
3044                timestamp_ms: 1_300,
3045            })
3046            .expect_err("terminal repaid plan must reject late resolution");
3047
3048        assert!(error.contains("terminal plan"));
3049        assert_eq!(state, before_late_resolution);
3050        assert_eq!(state.recovery_plans.get(plan_id).unwrap().status, "Repaid");
3051    }
3052
3053    fn recovery_plan(plan_id: &str) -> PmRecoveryPlanCommand {
3054        PmRecoveryPlanCommand {
3055            plan_id: plan_id.to_string(),
3056            wallet: wallet(),
3057            underlying: "BTC".to_string(),
3058            trigger: PmRecoveryTrigger::Debt,
3059            reason: PmRecoveryReason::SettlementDebt,
3060            policy_version: 1,
3061            recovery_priority_version: 1,
3062            target_reduction_usdc: dec!(100),
3063            expected_usdc_recovered: dec!(100),
3064            expected_obligation_reduced: dec!(100),
3065            expected_impact_usdc: dec!(1),
3066            post_plan_utilization: Some(dec!(0.25)),
3067            actions: vec![PmRecoveryActionCommand {
3068                action_index: 0,
3069                action: PmRecoveryActionKind::ClosePerp {
3070                    asset: "BTC".to_string(),
3071                    size: dec!(0.1),
3072                    reduce_only: true,
3073                },
3074                expected_usdc_recovered: dec!(100),
3075                expected_obligation_reduced: dec!(100),
3076                expected_impact_usdc: dec!(1),
3077            }],
3078        }
3079    }
3080
3081    #[test]
3082    fn duplicate_recovery_plan_rejournal_rejects_conflicting_payload() {
3083        let mut state = configured_state(dec!(10_000));
3084        let plan_id = "pm-recovery-duplicate-payload";
3085        state
3086            .apply_journal_recovery_plan(JournalPmRecoveryPlanCommand {
3087                request_id: request_uuid(49),
3088                input_digest: "recovery-plan-digest".to_string(),
3089                plan: recovery_plan(plan_id),
3090                timestamp_ms: 1_000,
3091            })
3092            .expect("initial plan should journal");
3093        let before = state.clone();
3094
3095        let mut conflicting_plan = recovery_plan(plan_id);
3096        conflicting_plan.actions[0].action = PmRecoveryActionKind::ClosePerp {
3097            asset: "BTC".to_string(),
3098            size: dec!(0.2),
3099            reduce_only: true,
3100        };
3101        let error = state
3102            .apply_journal_recovery_plan(JournalPmRecoveryPlanCommand {
3103                request_id: request_uuid(50),
3104                input_digest: "recovery-plan-digest".to_string(),
3105                plan: conflicting_plan,
3106                timestamp_ms: 1_100,
3107            })
3108            .expect_err("same plan id and digest with different payload must reject");
3109
3110        assert!(error.contains("payload mismatch"));
3111        assert_eq!(state, before);
3112        assert!(!state
3113            .idempotency
3114            .contains_key(&request_uuid(50).to_string()));
3115    }
3116
3117    #[test]
3118    fn recovery_plan_journal_rejects_invalid_action_payload_amounts() {
3119        let mut state = configured_state(dec!(10_000));
3120        let before = state.clone();
3121        let mut bad_transfer = recovery_plan("pm-recovery-bad-transfer");
3122        bad_transfer.actions[0].action = PmRecoveryActionKind::TransferCollateral {
3123            source: "external".to_string(),
3124            amount_usdc: dec!(-1),
3125        };
3126
3127        let error = state
3128            .apply_journal_recovery_plan(JournalPmRecoveryPlanCommand {
3129                request_id: request_uuid(48),
3130                input_digest: "bad-transfer-digest".to_string(),
3131                plan: bad_transfer,
3132                timestamp_ms: 1_000,
3133            })
3134            .expect_err("negative transfer action amount must reject");
3135
3136        assert!(error.contains("TransferCollateral amount_usdc"));
3137        assert_eq!(state, before);
3138
3139        let mut bad_close = recovery_plan("pm-recovery-bad-close");
3140        bad_close.actions[0].action = PmRecoveryActionKind::CloseOption {
3141            market_id: "BTC-20260630-100000-C".to_string(),
3142            size: Decimal::ZERO,
3143            side: hypercall_types::Side::Buy,
3144        };
3145        let error = state
3146            .apply_journal_recovery_plan(JournalPmRecoveryPlanCommand {
3147                request_id: request_uuid(47),
3148                input_digest: "bad-close-digest".to_string(),
3149                plan: bad_close,
3150                timestamp_ms: 1_001,
3151            })
3152            .expect_err("zero close action size must reject");
3153
3154        assert!(error.contains("CloseOption size"));
3155        assert_eq!(state, before);
3156    }
3157
3158    #[test]
3159    fn recovery_plan_journal_rejects_mismatched_plan_totals() {
3160        let mut state = configured_state(dec!(10_000));
3161        let mut plan = recovery_plan("pm-recovery-bad-total");
3162        plan.expected_usdc_recovered += dec!(1);
3163        let before = state.clone();
3164
3165        let error = state
3166            .apply_journal_recovery_plan(JournalPmRecoveryPlanCommand {
3167                request_id: request_uuid(46),
3168                input_digest: "bad-total-digest".to_string(),
3169                plan,
3170                timestamp_ms: 1_000,
3171            })
3172            .expect_err("plan totals must match action totals");
3173
3174        assert!(error.contains("expected_usdc_recovered"));
3175        assert_eq!(state, before);
3176        assert!(!state
3177            .idempotency
3178            .contains_key(&request_uuid(46).to_string()));
3179    }
3180
3181    #[test]
3182    fn recovery_plan_journal_rejects_second_active_plan_for_account() {
3183        let mut state = configured_state(dec!(10_000));
3184        let active_plan_id = "pm-recovery-active-plan";
3185        state.accounts.insert(
3186            PmSettlementAccountKey {
3187                wallet: wallet(),
3188                underlying: "BTC".to_string(),
3189            },
3190            PmSettlementAccountState {
3191                wallet: wallet(),
3192                underlying: "BTC".to_string(),
3193                bridge_principal_usdc: Decimal::ZERO,
3194                debt_principal_usdc: dec!(100),
3195                accrued_interest_usdc: Decimal::ZERO,
3196                last_interest_accrual_ms: 1_000,
3197                policy_version: 1,
3198                bridge_deadline_ms: None,
3199                status: PmSettlementAccountStatus::Debt,
3200                active_recovery_plan_id: None,
3201                updated_at_ms: 1_000,
3202            },
3203        );
3204        state
3205            .apply_journal_recovery_plan(JournalPmRecoveryPlanCommand {
3206                request_id: request_uuid(61),
3207                input_digest: "active-plan-digest".to_string(),
3208                plan: recovery_plan(active_plan_id),
3209                timestamp_ms: 1_000,
3210            })
3211            .expect("initial active plan should journal");
3212        let before = state.clone();
3213
3214        let error = state
3215            .apply_journal_recovery_plan(JournalPmRecoveryPlanCommand {
3216                request_id: request_uuid(62),
3217                input_digest: "second-plan-digest".to_string(),
3218                plan: recovery_plan("pm-recovery-second-plan"),
3219                timestamp_ms: 1_100,
3220            })
3221            .expect_err("second non-terminal plan must reject");
3222
3223        assert!(error.contains("already has active plan"));
3224        assert_eq!(state, before);
3225        assert!(!state
3226            .idempotency
3227            .contains_key(&request_uuid(62).to_string()));
3228    }
3229
3230    #[test]
3231    fn recovery_plan_journal_rejects_overlap_without_account_row() {
3232        let mut state = configured_state(dec!(10_000));
3233        let first_plan_id = "pm-recovery-no-account-first";
3234        state
3235            .apply_journal_recovery_plan(JournalPmRecoveryPlanCommand {
3236                request_id: request_uuid(75),
3237                input_digest: "no-account-first-digest".to_string(),
3238                plan: recovery_plan(first_plan_id),
3239                timestamp_ms: 1_000,
3240            })
3241            .expect("first no-account plan should journal");
3242        assert!(state
3243            .accounts
3244            .values()
3245            .all(|account| account.active_recovery_plan_id.as_deref() != Some(first_plan_id)));
3246        let before = state.clone();
3247
3248        let error = state
3249            .apply_journal_recovery_plan(JournalPmRecoveryPlanCommand {
3250                request_id: request_uuid(76),
3251                input_digest: "no-account-second-digest".to_string(),
3252                plan: recovery_plan("pm-recovery-no-account-second"),
3253                timestamp_ms: 1_100,
3254            })
3255            .expect_err("blocking no-account plan must reject overlap");
3256
3257        assert!(error.contains("already has active plan"));
3258        assert_eq!(state, before);
3259        assert!(!state
3260            .idempotency
3261            .contains_key(&request_uuid(76).to_string()));
3262    }
3263
3264    #[test]
3265    fn recovery_plan_journal_submit_and_resolve_updates_state_machine() {
3266        let mut state = configured_state(dec!(10_000));
3267        let plan_id = "pm-recovery-test-1";
3268
3269        let effects = state
3270            .apply_journal_recovery_plan(JournalPmRecoveryPlanCommand {
3271                request_id: request_uuid(50),
3272                input_digest: "recovery-plan-digest".to_string(),
3273                plan: recovery_plan(plan_id),
3274                timestamp_ms: 1_000,
3275            })
3276            .expect("plan should journal");
3277        assert!(matches!(
3278            effects.as_slice(),
3279            [PmSettlementProjectionEffect::RecoveryPlanUpsert(_)]
3280        ));
3281        let plan = state.recovery_plans.get(plan_id).unwrap();
3282        assert_eq!(plan.status, "Planned");
3283        assert_eq!(plan.actions[0].status, "Planned");
3284
3285        state
3286            .apply_mark_recovery_action_submitted(MarkPmRecoveryActionSubmittedCommand {
3287                request_id: request_uuid(51),
3288                input_digest: "submit-digest".to_string(),
3289                wallet: wallet(),
3290                plan_id: plan_id.to_string(),
3291                action_index: 0,
3292                attempt: 1,
3293                external_id: "directive-1".to_string(),
3294                external_kind: PmRecoveryExternalKind::Directive,
3295                timestamp_ms: 1_100,
3296            })
3297            .expect("action should submit");
3298        let plan = state.recovery_plans.get(plan_id).unwrap();
3299        assert_eq!(plan.status, "Submitted");
3300        assert_eq!(plan.actions[0].attempt, 1);
3301        assert_eq!(
3302            plan.actions[0].submitted_external_id.as_deref(),
3303            Some("directive-1")
3304        );
3305
3306        state
3307            .apply_resolve_recovery_action(ResolvePmRecoveryActionCommand {
3308                request_id: request_uuid(52),
3309                input_digest: "resolve-digest".to_string(),
3310                wallet: wallet(),
3311                plan_id: plan_id.to_string(),
3312                action_index: 0,
3313                attempt: 1,
3314                result: PmRecoveryActionResult::Confirmed,
3315                recovered_usdc: dec!(75),
3316                liability_reduction_usdc: dec!(75),
3317                result_external_id: Some("directive-1".to_string()),
3318                timestamp_ms: 1_200,
3319            })
3320            .expect("action should resolve");
3321        let plan = state.recovery_plans.get(plan_id).unwrap();
3322        assert_eq!(plan.status, "Confirmed");
3323        assert_eq!(plan.actions[0].status, "Confirmed");
3324        assert_eq!(plan.actions[0].recovered_usdc, dec!(75));
3325        assert_eq!(plan.actions[0].liability_reduction_usdc, dec!(75));
3326    }
3327
3328    #[test]
3329    fn recovery_duplicate_resolution_is_idempotent_without_double_counting() {
3330        let mut state = configured_state(dec!(10_000));
3331        let plan_id = "pm-recovery-test-2";
3332        state
3333            .apply_journal_recovery_plan(JournalPmRecoveryPlanCommand {
3334                request_id: request_uuid(60),
3335                input_digest: "plan-digest".to_string(),
3336                plan: recovery_plan(plan_id),
3337                timestamp_ms: 1_000,
3338            })
3339            .unwrap();
3340        state
3341            .apply_mark_recovery_action_submitted(MarkPmRecoveryActionSubmittedCommand {
3342                request_id: request_uuid(61),
3343                input_digest: "submit-digest".to_string(),
3344                wallet: wallet(),
3345                plan_id: plan_id.to_string(),
3346                action_index: 0,
3347                attempt: 1,
3348                external_id: "directive-2".to_string(),
3349                external_kind: PmRecoveryExternalKind::Directive,
3350                timestamp_ms: 1_100,
3351            })
3352            .unwrap();
3353        let command = ResolvePmRecoveryActionCommand {
3354            request_id: request_uuid(62),
3355            input_digest: "resolve-digest".to_string(),
3356            wallet: wallet(),
3357            plan_id: plan_id.to_string(),
3358            action_index: 0,
3359            attempt: 1,
3360            result: PmRecoveryActionResult::Confirmed,
3361            recovered_usdc: dec!(75),
3362            liability_reduction_usdc: dec!(75),
3363            result_external_id: Some("directive-2".to_string()),
3364            timestamp_ms: 1_200,
3365        };
3366
3367        state
3368            .apply_resolve_recovery_action(command.clone())
3369            .expect("first resolution applies");
3370        let after_first = state.clone();
3371        let effects = state
3372            .apply_resolve_recovery_action(command.clone())
3373            .expect("duplicate resolution is idempotent");
3374
3375        assert!(effects.is_empty());
3376        assert_eq!(state, after_first);
3377
3378        let mut conflicting = after_first.clone();
3379        let mut conflict_command = command.clone();
3380        conflict_command.request_id = request_uuid(63);
3381        conflict_command.input_digest = "resolve-conflict-digest".to_string();
3382        conflict_command.recovered_usdc = dec!(76);
3383        let error = conflicting
3384            .apply_resolve_recovery_action(conflict_command)
3385            .expect_err("conflicting duplicate resolution must reject");
3386
3387        assert!(error.contains("conflicting PM recovery action resolution"));
3388        assert_eq!(conflicting, after_first);
3389    }
3390
3391    #[test]
3392    fn failed_recovery_resolution_rejects_value_amounts() {
3393        let mut state = configured_state(dec!(10_000));
3394        let plan_id = "pm-recovery-failed-with-value";
3395        state
3396            .apply_journal_recovery_plan(JournalPmRecoveryPlanCommand {
3397                request_id: request_uuid(69),
3398                input_digest: "plan-digest".to_string(),
3399                plan: recovery_plan(plan_id),
3400                timestamp_ms: 1_000,
3401            })
3402            .unwrap();
3403        state
3404            .apply_mark_recovery_action_submitted(MarkPmRecoveryActionSubmittedCommand {
3405                request_id: request_uuid(70),
3406                input_digest: "submit-digest".to_string(),
3407                wallet: wallet(),
3408                plan_id: plan_id.to_string(),
3409                action_index: 0,
3410                attempt: 1,
3411                external_id: "directive-failed-with-value".to_string(),
3412                external_kind: PmRecoveryExternalKind::Directive,
3413                timestamp_ms: 1_100,
3414            })
3415            .unwrap();
3416        let before = state.clone();
3417
3418        let error = state
3419            .apply_resolve_recovery_action(ResolvePmRecoveryActionCommand {
3420                request_id: request_uuid(71),
3421                input_digest: "failed-value-resolve-digest".to_string(),
3422                wallet: wallet(),
3423                plan_id: plan_id.to_string(),
3424                action_index: 0,
3425                attempt: 1,
3426                result: PmRecoveryActionResult::Failed,
3427                recovered_usdc: dec!(1),
3428                liability_reduction_usdc: Decimal::ZERO,
3429                result_external_id: Some("failure-ref".to_string()),
3430                timestamp_ms: 1_200,
3431            })
3432            .expect_err("failed resolution must not carry recovered value");
3433
3434        assert!(error.contains("cannot carry recovered or liability reduction amounts"));
3435        assert_eq!(state, before);
3436        assert!(!state
3437            .idempotency
3438            .contains_key(&request_uuid(71).to_string()));
3439    }
3440
3441    #[test]
3442    fn duplicate_recovery_submission_rejects_conflicting_external_kind() {
3443        let mut state = configured_state(dec!(10_000));
3444        let plan_id = "pm-recovery-submit-kind";
3445        state
3446            .apply_journal_recovery_plan(JournalPmRecoveryPlanCommand {
3447                request_id: request_uuid(70),
3448                input_digest: "plan-digest".to_string(),
3449                plan: recovery_plan(plan_id),
3450                timestamp_ms: 1_000,
3451            })
3452            .unwrap();
3453        state
3454            .apply_mark_recovery_action_submitted(MarkPmRecoveryActionSubmittedCommand {
3455                request_id: request_uuid(71),
3456                input_digest: "submit-digest".to_string(),
3457                wallet: wallet(),
3458                plan_id: plan_id.to_string(),
3459                action_index: 0,
3460                attempt: 1,
3461                external_id: "same-external-id".to_string(),
3462                external_kind: PmRecoveryExternalKind::Directive,
3463                timestamp_ms: 1_100,
3464            })
3465            .unwrap();
3466        let before = state.clone();
3467
3468        let error = state
3469            .apply_mark_recovery_action_submitted(MarkPmRecoveryActionSubmittedCommand {
3470                request_id: request_uuid(72),
3471                input_digest: "submit-conflicting-kind-digest".to_string(),
3472                wallet: wallet(),
3473                plan_id: plan_id.to_string(),
3474                action_index: 0,
3475                attempt: 1,
3476                external_id: "same-external-id".to_string(),
3477                external_kind: PmRecoveryExternalKind::EngineOrder,
3478                timestamp_ms: 1_200,
3479            })
3480            .expect_err("conflicting external kind must reject");
3481
3482        assert!(error.contains("conflicting PM recovery action submission"));
3483        assert_eq!(state, before);
3484    }
3485
3486    #[test]
3487    fn recovery_submission_rejects_projection_oversized_attempt() {
3488        let mut state = configured_state(dec!(10_000));
3489        let plan_id = "pm-recovery-oversized-attempt";
3490        state
3491            .apply_journal_recovery_plan(JournalPmRecoveryPlanCommand {
3492                request_id: request_uuid(73),
3493                input_digest: "plan-digest".to_string(),
3494                plan: recovery_plan(plan_id),
3495                timestamp_ms: 1_000,
3496            })
3497            .unwrap();
3498        let before = state.clone();
3499
3500        let error = state
3501            .apply_mark_recovery_action_submitted(MarkPmRecoveryActionSubmittedCommand {
3502                request_id: request_uuid(74),
3503                input_digest: "oversized-attempt-digest".to_string(),
3504                wallet: wallet(),
3505                plan_id: plan_id.to_string(),
3506                action_index: 0,
3507                attempt: i32::MAX as u32 + 1,
3508                external_id: "oversized-attempt".to_string(),
3509                external_kind: PmRecoveryExternalKind::Directive,
3510                timestamp_ms: 1_100,
3511            })
3512            .expect_err("oversized attempt must reject before journaling");
3513
3514        assert!(error.contains("attempt"));
3515        assert_eq!(state, before);
3516        assert!(!state
3517            .idempotency
3518            .contains_key(&request_uuid(74).to_string()));
3519    }
3520
3521    #[test]
3522    fn failed_recovery_plan_remains_active_for_retry() {
3523        let mut state = configured_state(dec!(10_000));
3524        let plan_id = "pm-recovery-test-failed";
3525        state.accounts.insert(
3526            PmSettlementAccountKey {
3527                wallet: wallet(),
3528                underlying: "BTC".to_string(),
3529            },
3530            PmSettlementAccountState {
3531                wallet: wallet(),
3532                underlying: "BTC".to_string(),
3533                bridge_principal_usdc: Decimal::ZERO,
3534                debt_principal_usdc: dec!(100),
3535                accrued_interest_usdc: Decimal::ZERO,
3536                last_interest_accrual_ms: 1_000,
3537                policy_version: 1,
3538                bridge_deadline_ms: None,
3539                status: PmSettlementAccountStatus::Debt,
3540                active_recovery_plan_id: None,
3541                updated_at_ms: 1_000,
3542            },
3543        );
3544        state
3545            .apply_journal_recovery_plan(JournalPmRecoveryPlanCommand {
3546                request_id: request_uuid(64),
3547                input_digest: "plan-digest".to_string(),
3548                plan: recovery_plan(plan_id),
3549                timestamp_ms: 1_000,
3550            })
3551            .unwrap();
3552        state
3553            .apply_mark_recovery_action_submitted(MarkPmRecoveryActionSubmittedCommand {
3554                request_id: request_uuid(65),
3555                input_digest: "submit-digest".to_string(),
3556                wallet: wallet(),
3557                plan_id: plan_id.to_string(),
3558                action_index: 0,
3559                attempt: 1,
3560                external_id: "directive-failed".to_string(),
3561                external_kind: PmRecoveryExternalKind::Directive,
3562                timestamp_ms: 1_100,
3563            })
3564            .unwrap();
3565
3566        state
3567            .apply_resolve_recovery_action(ResolvePmRecoveryActionCommand {
3568                request_id: request_uuid(66),
3569                input_digest: "resolve-digest".to_string(),
3570                wallet: wallet(),
3571                plan_id: plan_id.to_string(),
3572                action_index: 0,
3573                attempt: 1,
3574                result: PmRecoveryActionResult::Failed,
3575                recovered_usdc: dec!(0),
3576                liability_reduction_usdc: dec!(0),
3577                result_external_id: Some("failure-ref".to_string()),
3578                timestamp_ms: 1_200,
3579            })
3580            .expect("failed action should resolve and remain retryable");
3581
3582        let plan = state.recovery_plans.get(plan_id).unwrap();
3583        assert_eq!(plan.status, "Failed");
3584        let account = state
3585            .accounts
3586            .get(&PmSettlementAccountKey {
3587                wallet: wallet(),
3588                underlying: "BTC".to_string(),
3589            })
3590            .unwrap();
3591        assert_eq!(account.active_recovery_plan_id.as_deref(), Some(plan_id));
3592
3593        state
3594            .apply_mark_recovery_action_submitted(MarkPmRecoveryActionSubmittedCommand {
3595                request_id: request_uuid(67),
3596                input_digest: "retry-submit-digest".to_string(),
3597                wallet: wallet(),
3598                plan_id: plan_id.to_string(),
3599                action_index: 0,
3600                attempt: 2,
3601                external_id: "directive-retry".to_string(),
3602                external_kind: PmRecoveryExternalKind::Directive,
3603                timestamp_ms: 1_300,
3604            })
3605            .expect("failed action should allow retry submission");
3606        assert_eq!(
3607            state.recovery_plans.get(plan_id).unwrap().actions[0].status,
3608            "Submitted"
3609        );
3610        let retried_action = &state.recovery_plans.get(plan_id).unwrap().actions[0];
3611        assert_eq!(retried_action.result, None);
3612        assert_eq!(retried_action.result_external_id, None);
3613        assert_eq!(retried_action.recovered_usdc, Decimal::ZERO);
3614        assert_eq!(retried_action.liability_reduction_usdc, Decimal::ZERO);
3615
3616        state
3617            .apply_resolve_recovery_action(ResolvePmRecoveryActionCommand {
3618                request_id: request_uuid(68),
3619                input_digest: "retry-resolve-digest".to_string(),
3620                wallet: wallet(),
3621                plan_id: plan_id.to_string(),
3622                action_index: 0,
3623                attempt: 2,
3624                result: PmRecoveryActionResult::Confirmed,
3625                recovered_usdc: dec!(25),
3626                liability_reduction_usdc: dec!(25),
3627                result_external_id: Some("directive-retry".to_string()),
3628                timestamp_ms: 1_400,
3629            })
3630            .expect("retry resolution should not conflict with previous failed result");
3631        assert_eq!(
3632            state.recovery_plans.get(plan_id).unwrap().actions[0].result,
3633            Some("Confirmed".to_string())
3634        );
3635    }
3636
3637    #[test]
3638    fn confirmed_and_canceled_plan_stays_active_until_repayment() {
3639        let mut state = configured_state(dec!(10_000));
3640        let plan_id = "pm-recovery-confirmed-canceled";
3641        state.accounts.insert(
3642            PmSettlementAccountKey {
3643                wallet: wallet(),
3644                underlying: "BTC".to_string(),
3645            },
3646            PmSettlementAccountState {
3647                wallet: wallet(),
3648                underlying: "BTC".to_string(),
3649                bridge_principal_usdc: Decimal::ZERO,
3650                debt_principal_usdc: dec!(100),
3651                accrued_interest_usdc: Decimal::ZERO,
3652                last_interest_accrual_ms: 1_000,
3653                policy_version: 1,
3654                bridge_deadline_ms: None,
3655                status: PmSettlementAccountStatus::Debt,
3656                active_recovery_plan_id: None,
3657                updated_at_ms: 1_000,
3658            },
3659        );
3660        let pool = state.pools.get_mut("BTC").unwrap();
3661        pool.active_settlement_debt_usdc = dec!(100);
3662        pool.recompute_utilization().unwrap();
3663        let mut plan = recovery_plan(plan_id);
3664        let mut second_action = plan.actions[0].clone();
3665        second_action.action_index = 1;
3666        second_action.action = PmRecoveryActionKind::CancelOrder {
3667            order_id: 42,
3668            reason: PmRecoveryReason::SettlementDebt,
3669        };
3670        second_action.expected_usdc_recovered = Decimal::ZERO;
3671        second_action.expected_obligation_reduced = Decimal::ZERO;
3672        second_action.expected_impact_usdc = Decimal::ZERO;
3673        plan.actions.push(second_action);
3674        state
3675            .apply_journal_recovery_plan(JournalPmRecoveryPlanCommand {
3676                request_id: request_uuid(80),
3677                input_digest: "plan-digest".to_string(),
3678                plan,
3679                timestamp_ms: 1_000,
3680            })
3681            .unwrap();
3682        state
3683            .apply_mark_recovery_action_submitted(MarkPmRecoveryActionSubmittedCommand {
3684                request_id: request_uuid(81),
3685                input_digest: "submit-first-digest".to_string(),
3686                wallet: wallet(),
3687                plan_id: plan_id.to_string(),
3688                action_index: 0,
3689                attempt: 1,
3690                external_id: "directive-confirmed".to_string(),
3691                external_kind: PmRecoveryExternalKind::Directive,
3692                timestamp_ms: 1_100,
3693            })
3694            .unwrap();
3695        state
3696            .apply_resolve_recovery_action(ResolvePmRecoveryActionCommand {
3697                request_id: request_uuid(82),
3698                input_digest: "resolve-first-digest".to_string(),
3699                wallet: wallet(),
3700                plan_id: plan_id.to_string(),
3701                action_index: 0,
3702                attempt: 1,
3703                result: PmRecoveryActionResult::Confirmed,
3704                recovered_usdc: dec!(50),
3705                liability_reduction_usdc: dec!(50),
3706                result_external_id: Some("directive-confirmed".to_string()),
3707                timestamp_ms: 1_200,
3708            })
3709            .unwrap();
3710        state
3711            .apply_mark_recovery_action_submitted(MarkPmRecoveryActionSubmittedCommand {
3712                request_id: request_uuid(83),
3713                input_digest: "submit-second-digest".to_string(),
3714                wallet: wallet(),
3715                plan_id: plan_id.to_string(),
3716                action_index: 1,
3717                attempt: 1,
3718                external_id: "directive-canceled".to_string(),
3719                external_kind: PmRecoveryExternalKind::Directive,
3720                timestamp_ms: 1_300,
3721            })
3722            .unwrap();
3723        let effects = state
3724            .apply_resolve_recovery_action(ResolvePmRecoveryActionCommand {
3725                request_id: request_uuid(84),
3726                input_digest: "resolve-second-digest".to_string(),
3727                wallet: wallet(),
3728                plan_id: plan_id.to_string(),
3729                action_index: 1,
3730                attempt: 1,
3731                result: PmRecoveryActionResult::Canceled,
3732                recovered_usdc: Decimal::ZERO,
3733                liability_reduction_usdc: Decimal::ZERO,
3734                result_external_id: Some("directive-canceled".to_string()),
3735                timestamp_ms: 1_400,
3736            })
3737            .unwrap();
3738
3739        assert_eq!(
3740            state.recovery_plans.get(plan_id).unwrap().status,
3741            "PartiallyRepaid"
3742        );
3743        assert_eq!(
3744            state
3745                .accounts
3746                .get(&PmSettlementAccountKey {
3747                    wallet: wallet(),
3748                    underlying: "BTC".to_string()
3749                })
3750                .unwrap()
3751                .active_recovery_plan_id,
3752            Some(plan_id.to_string())
3753        );
3754        assert!(!effects.iter().any(|effect| matches!(
3755            effect,
3756            PmSettlementProjectionEffect::AccountUpsert(account)
3757                if account.active_recovery_plan_id.is_none()
3758        )));
3759
3760        let before_duplicate_plan = state.clone();
3761        let error = state
3762            .apply_journal_recovery_plan(JournalPmRecoveryPlanCommand {
3763                request_id: request_uuid(85),
3764                input_digest: "duplicate-before-repayment-digest".to_string(),
3765                plan: recovery_plan("pm-recovery-before-repayment"),
3766                timestamp_ms: 1_500,
3767            })
3768            .expect_err("confirmed unrepaid recovery plan must block duplicate planning");
3769
3770        assert!(error.contains("already has active plan"));
3771        assert_eq!(state, before_duplicate_plan);
3772
3773        let effects = state
3774            .apply_repayment(ApplyPmSettlementRepaymentCommand {
3775                request_id: request_uuid(86),
3776                input_digest: "partial-confirmed-repayment-digest".to_string(),
3777                wallet: wallet(),
3778                underlying: "BTC".to_string(),
3779                amount_usdc: dec!(50),
3780                reason: "admin_test".to_string(),
3781                source_event_id: "source-1".to_string(),
3782                timestamp_ms: 1_600,
3783            })
3784            .expect("confirmed cash repayment should retire the finished plan");
3785        let account = state
3786            .accounts
3787            .get(&PmSettlementAccountKey {
3788                wallet: wallet(),
3789                underlying: "BTC".to_string(),
3790            })
3791            .unwrap();
3792        assert_eq!(account.debt_principal_usdc, dec!(50));
3793        assert_eq!(account.active_recovery_plan_id, None);
3794        assert_eq!(state.recovery_plans.get(plan_id).unwrap().status, "Repaid");
3795        assert!(effects.iter().any(|effect| matches!(
3796            effect,
3797            PmSettlementProjectionEffect::RecoveryPlanUpsert(plan)
3798                if plan.plan_id == plan_id && plan.status == "Repaid"
3799        )));
3800
3801        state
3802            .apply_journal_recovery_plan(JournalPmRecoveryPlanCommand {
3803                request_id: request_uuid(87),
3804                input_digest: "replacement-after-partial-repayment-digest".to_string(),
3805                plan: recovery_plan("pm-recovery-after-partial-repayment"),
3806                timestamp_ms: 1_700,
3807            })
3808            .expect("residual debt should allow a replacement recovery plan");
3809    }
3810}