Skip to main content

hypercall_db_diesel/
database_handler.rs

1//! Unified database handler that owns a connection pool and implements
2//! all persistence traits from `hypercall_db`.
3//!
4//! This is the standalone Diesel persistence layer. It has NO dependency
5//! on the root `hypercall` crate and owns its own `DbPool` directly.
6//! All SQL is inline or delegated to local modules (`crate::ledger_ops`,
7//! `crate::settlement_ops`, `crate::order_status_materialization`).
8//!
9//! Trait implementations live in sibling modules:
10//! - `crate::instruments` -- InstrumentReader / InstrumentWriter
11//! - `crate::tiers`       -- TierReader / TierWriter
12//! - `crate::mmp`         -- MmpConfigReader / MmpConfigWriter
13//! - `crate::orders`      -- OrderReader / OrderWriter + fill ledger helper
14//! - `crate::transaction` -- Transactional (+ DieselTransaction)
15//! - `crate::replay`      -- JournalReplayReader
16//! - `crate::rfq`         -- RfqReader / RfqWriter
17//! - `crate::settlements` -- SettlementReader / SettlementWriter
18
19use crate::db_auth::DbAuthConfig;
20use anyhow::{Context, Result};
21use diesel::backend::Backend;
22use diesel::connection::SimpleConnection;
23use diesel::migration::{
24    Migration, MigrationMetadata, MigrationName, MigrationSource, MigrationVersion,
25};
26use diesel::pg::Pg;
27use diesel::pg::PgConnection;
28use diesel::prelude::*;
29use diesel::r2d2::{self, ConnectionManager};
30use diesel::{Connection, RunQueryDsl};
31use diesel_migrations::{EmbeddedMigrations, MigrationHarness};
32use std::fmt;
33use std::sync::Arc;
34use tracing::info;
35
36/// Connection manager that resolves the database URL on every `connect()` call.
37///
38/// In password mode the URL is a captured `String` (zero-cost clone via `Arc`).
39/// In RDS IAM mode the closure reads the latest cached token from the provider.
40pub struct DynConnectionManager {
41    url_source: Arc<dyn Fn() -> String + Send + Sync>,
42}
43
44impl DynConnectionManager {
45    /// Construct from a [`DbAuthConfig`]. Password mode captures a static URL;
46    /// IAM mode calls `current_url()` on each connection attempt.
47    pub fn new(auth: &DbAuthConfig) -> Self {
48        let auth = auth.clone();
49        Self {
50            url_source: Arc::new(move || auth.current_url()),
51        }
52    }
53}
54
55impl r2d2::ManageConnection for DynConnectionManager {
56    type Connection = PgConnection;
57    type Error = r2d2::Error;
58
59    fn connect(&self) -> std::result::Result<PgConnection, r2d2::Error> {
60        let url = (self.url_source)();
61        PgConnection::establish(&url).map_err(r2d2::Error::ConnectionError)
62    }
63
64    fn is_valid(&self, conn: &mut PgConnection) -> std::result::Result<(), r2d2::Error> {
65        conn.batch_execute("SELECT 1")
66            .map_err(r2d2::Error::QueryError)
67    }
68
69    fn has_broken(&self, conn: &mut PgConnection) -> bool {
70        use diesel::r2d2::R2D2Connection;
71        std::thread::panicking() || conn.is_broken()
72    }
73}
74
75impl std::fmt::Debug for DynConnectionManager {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        f.debug_struct("DynConnectionManager").finish()
78    }
79}
80
81/// Synchronous r2d2 connection pool for Postgres.
82pub type DbPool = diesel::r2d2::Pool<DynConnectionManager>;
83
84/// Legacy pool type alias for backward compatibility with code that constructs
85/// pools directly with `ConnectionManager`.
86pub type LegacyDbPool = diesel::r2d2::Pool<ConnectionManager<PgConnection>>;
87
88fn wallet_from_bytes(bytes: &[u8], label: &str) -> Result<hypercall_types::WalletAddress> {
89    let arr: [u8; 20] = bytes.try_into().map_err(|_| {
90        anyhow::anyhow!(
91            "invalid {} address length {}, expected 20",
92            label,
93            bytes.len()
94        )
95    })?;
96    Ok(hypercall_types::WalletAddress::from(arr))
97}
98
99fn rsm_deposit_match_from_row(
100    request_id: String,
101    account: Vec<u8>,
102    token: Vec<u8>,
103    tx_hash: String,
104    log_index: i64,
105    amount_wei: String,
106    label: &str,
107) -> Result<hypercall_db::RsmUsdcDepositMatch> {
108    let account = wallet_from_bytes(
109        account.as_slice(),
110        &format!("{label} account {tx_hash}:{log_index}"),
111    )?;
112    let token = wallet_from_bytes(
113        token.as_slice(),
114        &format!("{label} token {tx_hash}:{log_index}"),
115    )?;
116    Ok(hypercall_db::RsmUsdcDepositMatch {
117        request_id,
118        account,
119        tx_hash,
120        log_index,
121        amount_wei,
122        token,
123    })
124}
125
126/// Read the option token deployment parameters from environment variables.
127/// Used by `save_market_and_instrument_sync` to derive option token addresses.
128pub(crate) fn current_option_token_deployment() -> Result<hypercall_types::OptionTokenDeployment> {
129    use alloy::primitives::{Address, B256};
130    use std::str::FromStr;
131
132    let registry_str = std::env::var("OPTION_REGISTRY_ADDRESS").map_err(|_| {
133        anyhow::anyhow!(
134            "Option token derivation requires OPTION_REGISTRY_ADDRESS from backend contracts config"
135        )
136    })?;
137    let init_hash_str =
138        std::env::var("OPTION_TOKEN_BEACON_PROXY_INIT_CODE_HASH").map_err(|_| {
139            anyhow::anyhow!(
140            "Option token derivation requires OPTION_TOKEN_BEACON_PROXY_INIT_CODE_HASH from backend contracts config"
141        )
142        })?;
143
144    let option_registry = Address::from_str(registry_str.trim()).map_err(|e| {
145        anyhow::anyhow!(
146            "Option token derivation requires valid OPTION_REGISTRY_ADDRESS: {}",
147            e
148        )
149    })?;
150    let beacon_proxy_init_code_hash = B256::from_str(init_hash_str.trim()).map_err(|e| {
151        anyhow::anyhow!(
152            "Option token derivation requires valid OPTION_TOKEN_BEACON_PROXY_INIT_CODE_HASH: {}",
153            e
154        )
155    })?;
156
157    Ok(hypercall_types::OptionTokenDeployment::new(
158        option_registry,
159        beacon_proxy_init_code_hash,
160    ))
161}
162
163/// Embed migrations relative to hypercall-rs/db-diesel/Cargo.toml.
164pub const MIGRATIONS: EmbeddedMigrations = diesel_migrations::embed_migrations!("../../migrations");
165
166const ORDER_INFOS_ACTIVE_SYMBOL_INDEX_MIGRATION: &str =
167    "2026-06-01-000001_order_infos_active_symbol_index";
168
169/// Diesel runs each embedded migration file with one `batch_execute` call.
170/// PostgreSQL rejects `CREATE/DROP INDEX CONCURRENTLY` when it appears in a
171/// multi-statement batch, even outside an explicit transaction. This source
172/// replaces the one affected migration with a splitter that executes each
173/// statement separately while preserving Diesel's migration ledger behavior.
174struct HypercallMigrations;
175
176impl MigrationSource<Pg> for HypercallMigrations {
177    fn migrations(&self) -> diesel::migration::Result<Vec<Box<dyn Migration<Pg>>>> {
178        MIGRATIONS
179            .migrations()?
180            .into_iter()
181            .map(|migration| {
182                if migration.name().to_string() == ORDER_INFOS_ACTIVE_SYMBOL_INDEX_MIGRATION {
183                    Ok(
184                        Box::new(SplitStatementMigration::order_infos_active_symbol_index())
185                            as Box<dyn Migration<Pg>>,
186                    )
187                } else {
188                    Ok(migration)
189                }
190            })
191            .collect()
192    }
193}
194
195#[derive(Debug)]
196struct SplitStatementMigration {
197    name: StaticMigrationName,
198    metadata: NonTransactionalMigrationMetadata,
199    up_sql: &'static str,
200    down_sql: &'static str,
201}
202
203impl SplitStatementMigration {
204    fn order_infos_active_symbol_index() -> Self {
205        Self {
206            name: StaticMigrationName(ORDER_INFOS_ACTIVE_SYMBOL_INDEX_MIGRATION),
207            metadata: NonTransactionalMigrationMetadata,
208            up_sql: include_str!(
209                "../../../migrations/2026-06-01-000001_order_infos_active_symbol_index/up.sql"
210            ),
211            down_sql: include_str!(
212                "../../../migrations/2026-06-01-000001_order_infos_active_symbol_index/down.sql"
213            ),
214        }
215    }
216
217    fn execute_statements<DB: Backend>(
218        &self,
219        conn: &mut dyn diesel::connection::BoxableConnection<DB>,
220        sql: &str,
221    ) -> diesel::migration::Result<()> {
222        for statement in sql.split(';').map(str::trim).filter(|s| !s.is_empty()) {
223            conn.batch_execute(statement).map_err(|error| {
224                let message = format!(
225                    "failed to run statement in migration {}: {}",
226                    self.name, error
227                );
228                Box::new(std::io::Error::new(std::io::ErrorKind::Other, message))
229                    as Box<dyn std::error::Error + Send + Sync>
230            })?;
231        }
232        Ok(())
233    }
234}
235
236impl Migration<Pg> for SplitStatementMigration {
237    fn run(
238        &self,
239        conn: &mut dyn diesel::connection::BoxableConnection<Pg>,
240    ) -> diesel::migration::Result<()> {
241        self.execute_statements(conn, self.up_sql)
242    }
243
244    fn revert(
245        &self,
246        conn: &mut dyn diesel::connection::BoxableConnection<Pg>,
247    ) -> diesel::migration::Result<()> {
248        self.execute_statements(conn, self.down_sql)
249    }
250
251    fn metadata(&self) -> &dyn MigrationMetadata {
252        &self.metadata
253    }
254
255    fn name(&self) -> &dyn MigrationName {
256        &self.name
257    }
258}
259
260#[derive(Debug)]
261struct StaticMigrationName(&'static str);
262
263impl MigrationName for StaticMigrationName {
264    fn version(&self) -> MigrationVersion<'_> {
265        self.0.split('_').next().unwrap_or(self.0).into()
266    }
267}
268
269impl fmt::Display for StaticMigrationName {
270    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
271        f.write_str(self.0)
272    }
273}
274
275#[derive(Debug)]
276struct NonTransactionalMigrationMetadata;
277
278impl MigrationMetadata for NonTransactionalMigrationMetadata {
279    fn run_in_transaction(&self) -> bool {
280        false
281    }
282}
283
284/// Sets statement_timeout and lock_timeout on every new Postgres connection.
285#[derive(Debug)]
286struct PgTimeoutCustomizer {
287    statement_timeout_ms: u32,
288    lock_timeout_ms: u32,
289}
290
291impl r2d2::CustomizeConnection<PgConnection, r2d2::Error> for PgTimeoutCustomizer {
292    fn on_acquire(&self, conn: &mut PgConnection) -> std::result::Result<(), r2d2::Error> {
293        conn.batch_execute(&format!(
294            "SET statement_timeout = '{}'; SET lock_timeout = '{}'",
295            self.statement_timeout_ms, self.lock_timeout_ms,
296        ))
297        .map_err(r2d2::Error::QueryError)
298    }
299}
300
301/// Redact credentials from a database URL for safe logging.
302fn redact_database_url(url: &str) -> String {
303    url.find('@')
304        .map(|at| format!("postgres://***@{}", &url[at + 1..]))
305        .unwrap_or_else(|| "postgres://***".to_string())
306}
307
308/// Build a raw r2d2 pool with the given auth config, for callers that manage
309/// the pool lifecycle themselves (e.g. `integrated.rs`).
310pub fn build_db_pool(
311    auth: &DbAuthConfig,
312    pool_max: u32,
313    statement_timeout_ms: u32,
314    lock_timeout_ms: u32,
315) -> Result<DbPool> {
316    let manager = DynConnectionManager::new(auth);
317    r2d2::Pool::builder()
318        .max_size(pool_max)
319        .min_idle(Some(0))
320        .connection_timeout(std::time::Duration::from_secs(5))
321        .idle_timeout(Some(std::time::Duration::from_secs(300)))
322        .connection_customizer(Box::new(PgTimeoutCustomizer {
323            statement_timeout_ms,
324            lock_timeout_ms,
325        }))
326        .build(manager)
327        .with_context(|| "Failed to create connection pool")
328}
329
330/// Synchronous persistence handler. Owns an r2d2 connection pool and
331/// implements all engine-path traits from `hypercall_db` (orders,
332/// instruments, tiers, settlements, journal replay, archiver, RFQ).
333///
334/// Constructors run Diesel migrations and `ensure_enum_values` by default.
335/// Use [`Self::new_readonly`] or [`Self::with_pool_no_migrations`] to skip.
336pub struct DatabaseHandler {
337    pool: Arc<DbPool>,
338}
339
340impl DatabaseHandler {
341    /// Create a new handler: build pool + run migrations.
342    pub fn new(database_url: &str) -> Result<Self> {
343        Self::new_with_pool_size(database_url, 3)
344    }
345
346    /// Create a new handler with explicit pool size: build pool + run migrations.
347    pub fn new_with_pool_size(database_url: &str, pool_max: u32) -> Result<Self> {
348        Self::new_with_auth(DbAuthConfig::password(database_url), pool_max)
349    }
350
351    /// Create a new handler with [`DbAuthConfig`]: build pool + run migrations.
352    pub fn new_with_auth(auth: DbAuthConfig, pool_max: u32) -> Result<Self> {
353        let handler = Self::build_pool(&auth, pool_max, 30_000, 10_000)?;
354        handler.run_migrations().with_context(|| {
355            format!(
356                "Failed to run migrations for database (auth={:?}): {}",
357                auth,
358                redact_database_url(&auth.current_url())
359            )
360        })?;
361        Ok(handler)
362    }
363
364    /// Create a handler with custom statement/lock timeouts, no migrations.
365    /// Used by db-archiver which needs longer timeouts for batch operations.
366    pub fn new_with_timeouts(
367        database_url: &str,
368        pool_max: u32,
369        statement_timeout_ms: u32,
370        lock_timeout_ms: u32,
371    ) -> Result<Self> {
372        Self::new_with_timeouts_auth(
373            DbAuthConfig::password(database_url),
374            pool_max,
375            statement_timeout_ms,
376            lock_timeout_ms,
377        )
378    }
379
380    /// Create a handler with custom timeouts and [`DbAuthConfig`], no migrations.
381    pub fn new_with_timeouts_auth(
382        auth: DbAuthConfig,
383        pool_max: u32,
384        statement_timeout_ms: u32,
385        lock_timeout_ms: u32,
386    ) -> Result<Self> {
387        let redacted = redact_database_url(&auth.current_url());
388        info!(
389            "Initializing DatabaseHandler (custom timeouts: stmt={}ms, lock={}ms): {}",
390            statement_timeout_ms, lock_timeout_ms, redacted
391        );
392        Self::build_pool(&auth, pool_max, statement_timeout_ms, lock_timeout_ms)
393    }
394
395    /// Create a handler with pool only, no migrations (for read-only replicas).
396    pub fn new_readonly(database_url: &str, pool_max: u32) -> Result<Self> {
397        Self::new_readonly_auth(DbAuthConfig::password(database_url), pool_max)
398    }
399
400    /// Create a readonly handler with [`DbAuthConfig`], no migrations.
401    pub fn new_readonly_auth(auth: DbAuthConfig, pool_max: u32) -> Result<Self> {
402        let redacted = redact_database_url(&auth.current_url());
403        info!(
404            "Initializing readonly DatabaseHandler (migrations skipped): {}",
405            redacted
406        );
407        Self::build_pool(&auth, pool_max, 30_000, 10_000)
408    }
409
410    /// Wrap an existing pool + run migrations.
411    pub fn with_pool(pool: Arc<DbPool>) -> Result<Self> {
412        info!("Creating DatabaseHandler with existing pool");
413        let handler = Self { pool };
414        handler
415            .run_migrations()
416            .with_context(|| "Failed to run migrations")?;
417        Ok(handler)
418    }
419
420    /// Wrap an existing pool, skip migrations.
421    pub fn with_pool_no_migrations(pool: Arc<DbPool>) -> Self {
422        info!("Creating DatabaseHandler with existing pool (migrations skipped)");
423        Self { pool }
424    }
425
426    /// Expose the pool (Arc-cloned).
427    pub fn pool(&self) -> Arc<DbPool> {
428        self.pool.clone()
429    }
430
431    /// Clone the inner pool (not the Arc).
432    pub fn get_pool(&self) -> DbPool {
433        (*self.pool).clone()
434    }
435
436    /// Run pending Diesel migrations + ensure enum values.
437    pub fn run_migrations(&self) -> Result<()> {
438        let mut conn = self.pool.get()?;
439
440        match conn.run_pending_migrations(HypercallMigrations) {
441            Ok(migrations) => {
442                info!(
443                    "Database migrations completed successfully: {} migrations applied",
444                    migrations.len()
445                );
446            }
447            Err(e) => return Err(anyhow::anyhow!("Migration failed: {}", e)),
448        }
449
450        Self::ensure_enum_values(&mut conn)?;
451        Self::ensure_directive_action_key_enum(&mut conn)?;
452        Self::ensure_directive_outbox_wallet_address(&mut conn)?;
453        Self::ensure_real_liquidation_schema(&mut conn)?;
454
455        Ok(())
456    }
457
458    fn ensure_real_liquidation_schema(conn: &mut PgConnection) -> Result<()> {
459        diesel::sql_query(
460            "ALTER TABLE liquidation_states
461                ADD COLUMN IF NOT EXISTS liquidation_mode TEXT,
462                ADD COLUMN IF NOT EXISTS target_equity NUMERIC,
463                ADD COLUMN IF NOT EXISTS escalation_deadline BIGINT,
464                ADD COLUMN IF NOT EXISTS last_reprice_at BIGINT,
465                ADD COLUMN IF NOT EXISTS partial_order_request_ids JSONB,
466                ADD COLUMN IF NOT EXISTS partial_order_client_ids JSONB,
467                ADD COLUMN IF NOT EXISTS partial_bonus_bps INTEGER,
468                ADD COLUMN IF NOT EXISTS request_id TEXT,
469                ADD COLUMN IF NOT EXISTS tx_hash TEXT,
470                ADD COLUMN IF NOT EXISTS chain_start_time BIGINT,
471                ADD COLUMN IF NOT EXISTS margin_needed NUMERIC,
472                ADD COLUMN IF NOT EXISTS stop_request_id TEXT,
473                ADD COLUMN IF NOT EXISTS stop_tx_hash TEXT,
474                ADD COLUMN IF NOT EXISTS resolved_winner BYTEA,
475                ADD COLUMN IF NOT EXISTS resolved_bonus NUMERIC,
476                ADD COLUMN IF NOT EXISTS resolution_tx_hash TEXT,
477                ADD COLUMN IF NOT EXISTS last_observed_block BIGINT,
478                ADD COLUMN IF NOT EXISTS updated_at_ms BIGINT",
479        )
480        .execute(conn)
481        .map_err(|e| anyhow::anyhow!("Failed to ensure liquidation_states schema: {}", e))?;
482
483        diesel::sql_query(
484            "ALTER TABLE liquidation_history
485                ADD COLUMN IF NOT EXISTS liquidation_mode TEXT,
486                ADD COLUMN IF NOT EXISTS maintenance_margin NUMERIC,
487                ADD COLUMN IF NOT EXISTS request_id TEXT,
488                ADD COLUMN IF NOT EXISTS tx_hash TEXT,
489                ADD COLUMN IF NOT EXISTS margin_needed NUMERIC,
490                ADD COLUMN IF NOT EXISTS winner_address BYTEA,
491                ADD COLUMN IF NOT EXISTS bonus NUMERIC,
492                ADD COLUMN IF NOT EXISTS details JSONB NOT NULL DEFAULT '{}'::jsonb",
493        )
494        .execute(conn)
495        .map_err(|e| anyhow::anyhow!("Failed to ensure liquidation_history schema: {}", e))?;
496
497        diesel::sql_query(
498            "UPDATE liquidation_history
499             SET maintenance_margin = equity - mm_required
500             WHERE maintenance_margin IS NULL",
501        )
502        .execute(conn)
503        .map_err(|e| {
504            anyhow::anyhow!(
505                "Failed to backfill liquidation_history.maintenance_margin: {}",
506                e
507            )
508        })?;
509
510        diesel::sql_query(
511            "ALTER TABLE liquidation_history
512             ALTER COLUMN maintenance_margin SET NOT NULL",
513        )
514        .execute(conn)
515        .map_err(|e| {
516            anyhow::anyhow!(
517                "Failed to enforce liquidation_history.maintenance_margin not-null: {}",
518                e
519            )
520        })?;
521
522        diesel::sql_query(
523            "CREATE UNIQUE INDEX IF NOT EXISTS idx_liquidation_history_replay_guard
524             ON liquidation_history (
525                 wallet_address, previous_state, new_state, timestamp,
526                 COALESCE(auction_id, ''), COALESCE(request_id, ''), COALESCE(tx_hash, '')
527             )",
528        )
529        .execute(conn)
530        .map_err(|e| anyhow::anyhow!("Failed to ensure liquidation_history replay index: {}", e))?;
531
532        diesel::sql_query(
533            "ALTER TABLE liquidation_auctions
534                ADD COLUMN IF NOT EXISTS target_equity NUMERIC,
535                ADD COLUMN IF NOT EXISTS request_id TEXT,
536                ADD COLUMN IF NOT EXISTS tx_hash TEXT,
537                ADD COLUMN IF NOT EXISTS chain_start_time BIGINT,
538                ADD COLUMN IF NOT EXISTS margin_needed NUMERIC,
539                ADD COLUMN IF NOT EXISTS stop_request_id TEXT,
540                ADD COLUMN IF NOT EXISTS stop_tx_hash TEXT,
541                ADD COLUMN IF NOT EXISTS bonus NUMERIC,
542                ADD COLUMN IF NOT EXISTS last_observed_block BIGINT",
543        )
544        .execute(conn)
545        .map_err(|e| anyhow::anyhow!("Failed to ensure liquidation_auctions schema: {}", e))?;
546
547        diesel::sql_query(
548            "CREATE TABLE IF NOT EXISTS rsm_signer_nonces (
549                signer_address BYTEA PRIMARY KEY,
550                next_nonce BIGINT NOT NULL,
551                last_synced_nonce BIGINT,
552                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
553                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
554            )",
555        )
556        .execute(conn)
557        .map_err(|e| anyhow::anyhow!("Failed to ensure rsm_signer_nonces table: {}", e))?;
558
559        diesel::sql_query(
560            "CREATE TABLE IF NOT EXISTS rsm_signer_requests (
561                request_id TEXT PRIMARY KEY,
562                signer_address BYTEA NOT NULL,
563                account_address BYTEA NOT NULL,
564                action BYTEA NOT NULL,
565                nonce BIGINT NOT NULL,
566                status TEXT NOT NULL,
567                directive BYTEA,
568                signature TEXT,
569                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
570                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
571            )",
572        )
573        .execute(conn)
574        .map_err(|e| anyhow::anyhow!("Failed to ensure rsm_signer_requests table: {}", e))?;
575
576        diesel::sql_query(
577            "ALTER TABLE rsm_signer_requests ADD COLUMN IF NOT EXISTS signer_address BYTEA",
578        )
579        .execute(conn)
580        .map_err(|e| {
581            anyhow::anyhow!(
582                "Failed to ensure rsm_signer_requests signer_address column: {}",
583                e
584            )
585        })?;
586
587        diesel::sql_query("DELETE FROM rsm_signer_requests WHERE signer_address IS NULL")
588            .execute(conn)
589            .map_err(|e| {
590                anyhow::anyhow!(
591                    "Failed to remove rsm_signer_requests rows missing signer_address: {}",
592                    e
593                )
594            })?;
595
596        diesel::sql_query(
597            "ALTER TABLE rsm_signer_requests ALTER COLUMN signer_address SET NOT NULL",
598        )
599        .execute(conn)
600        .map_err(|e| {
601            anyhow::anyhow!(
602                "Failed to require rsm_signer_requests signer_address: {}",
603                e
604            )
605        })?;
606
607        Ok(())
608    }
609
610    // ------------------------------------------------------------------
611    // Internal helpers
612    // ------------------------------------------------------------------
613
614    fn build_pool(
615        auth: &DbAuthConfig,
616        pool_max: u32,
617        statement_timeout_ms: u32,
618        lock_timeout_ms: u32,
619    ) -> Result<Self> {
620        let redacted = redact_database_url(&auth.current_url());
621        info!(
622            "Initializing DatabaseHandler (auth={:?}): {}",
623            auth, redacted
624        );
625        let manager = DynConnectionManager::new(auth);
626        let pool = r2d2::Pool::builder()
627            .max_size(pool_max)
628            .min_idle(Some(0))
629            .connection_timeout(std::time::Duration::from_secs(5))
630            .idle_timeout(Some(std::time::Duration::from_secs(300)))
631            .connection_customizer(Box::new(PgTimeoutCustomizer {
632                statement_timeout_ms,
633                lock_timeout_ms,
634            }))
635            .build(manager)
636            .with_context(|| {
637                format!(
638                    "Failed to create connection pool for database: {}",
639                    redacted
640                )
641            })?;
642
643        Ok(Self {
644            pool: Arc::new(pool),
645        })
646    }
647
648    /// Ensure all required Postgres enum values exist.
649    fn ensure_enum_values(conn: &mut PgConnection) -> Result<()> {
650        let stmts = [
651            "ALTER TYPE engine_command_type ADD VALUE IF NOT EXISTS 'RfqExecute'",
652            "ALTER TYPE engine_event_type ADD VALUE IF NOT EXISTS 'RfqFilled'",
653            "ALTER TYPE engine_command_type ADD VALUE IF NOT EXISTS 'ReplaceOrder' AFTER 'CancelOrder'",
654            "ALTER TYPE engine_command_type ADD VALUE IF NOT EXISTS 'PriceUpdate'",
655            "ALTER TYPE engine_command_type ADD VALUE IF NOT EXISTS 'IvUpdate'",
656            "ALTER TYPE engine_command_type ADD VALUE IF NOT EXISTS 'TierUpdate'",
657            "ALTER TYPE engine_command_type ADD VALUE IF NOT EXISTS 'HypercorePositionUpdate'",
658            "ALTER TYPE engine_command_type ADD VALUE IF NOT EXISTS 'MmpConfigUpdate'",
659            "ALTER TYPE engine_command_type ADD VALUE IF NOT EXISTS 'LiquidationBonusUpdate'",
660            "ALTER TYPE engine_command_type ADD VALUE IF NOT EXISTS 'ApproveAgent'",
661            "ALTER TYPE engine_command_type ADD VALUE IF NOT EXISTS 'RevokeAgent'",
662            "ALTER TYPE engine_command_type ADD VALUE IF NOT EXISTS 'NonceAdvance'",
663            "ALTER TYPE engine_command_type ADD VALUE IF NOT EXISTS 'HypercoreEquityUpdate'",
664            "ALTER TYPE engine_command_type ADD VALUE IF NOT EXISTS 'OptionDepositUpdate'",
665            "ALTER TYPE engine_command_type ADD VALUE IF NOT EXISTS 'OptionWithdrawalUpdate'",
666            "ALTER TYPE engine_command_type ADD VALUE IF NOT EXISTS 'CashWithdrawalUpdate'",
667            "ALTER TYPE engine_command_type ADD VALUE IF NOT EXISTS 'RecordPmVaultDeposit'",
668            "ALTER TYPE engine_command_type ADD VALUE IF NOT EXISTS 'RequestPmVaultWithdrawal'",
669        ];
670
671        for stmt in &stmts {
672            diesel::sql_query(*stmt)
673                .execute(conn)
674                .map_err(|e| anyhow::anyhow!("Failed to ensure enum value: {} - {}", stmt, e))?;
675        }
676
677        info!("Ensured all required Postgres enum values exist");
678        Ok(())
679    }
680
681    /// Ensure the directive outbox action-key enum exists even if an older
682    /// duplicate-version migration was recorded before this enum migration ran.
683    fn ensure_directive_action_key_enum(conn: &mut PgConnection) -> Result<()> {
684        diesel::sql_query(
685            r#"
686            DO $$
687            BEGIN
688                IF NOT EXISTS (
689                    SELECT 1
690                    FROM pg_type
691                    WHERE typname = 'directive_action_key'
692                ) THEN
693                    CREATE TYPE directive_action_key AS ENUM (
694                        'hl_limit_order',
695                        'hl_cancel_by_oid',
696                        'hl_cancel_by_cloid',
697                        'hc_update_api_wallet',
698                        'hl_send_asset',
699                        'hc_transfer_option',
700                        'rsm_hl_limit_order',
701                        'rsm_hl_cancel_by_oid',
702                        'rsm_hl_cancel_by_cloid',
703                        'rsm_hl_send_asset',
704                        'system_credit_token',
705                        'system_credit_option',
706                        'system_start_liquidation',
707                        'system_stop_liquidation',
708                        'system_withdraw_token'
709                    );
710                END IF;
711            END $$;
712            "#,
713        )
714        .execute(conn)
715        .context("Failed to ensure directive_action_key enum type exists")?;
716
717        let values = [
718            "hl_limit_order",
719            "hl_cancel_by_oid",
720            "hl_cancel_by_cloid",
721            "hc_update_api_wallet",
722            "hl_send_asset",
723            "hc_transfer_option",
724            "rsm_hl_limit_order",
725            "rsm_hl_cancel_by_oid",
726            "rsm_hl_cancel_by_cloid",
727            "rsm_hl_send_asset",
728            "system_credit_token",
729            "system_credit_option",
730            "system_start_liquidation",
731            "system_stop_liquidation",
732            "system_withdraw_token",
733        ];
734        for value in values {
735            diesel::sql_query(format!(
736                "ALTER TYPE directive_action_key ADD VALUE IF NOT EXISTS '{}'",
737                value
738            ))
739            .execute(conn)
740            .with_context(|| format!("Failed to ensure directive_action_key enum value {value}"))?;
741        }
742
743        diesel::sql_query(
744            r#"
745            ALTER TABLE directive_outbox
746                ALTER COLUMN action_key TYPE directive_action_key
747                USING action_key::text::directive_action_key
748            "#,
749        )
750        .execute(conn)
751        .context("Failed to ensure directive_outbox.action_key uses directive_action_key")?;
752
753        info!("Ensured directive_action_key enum and directive_outbox schema exist");
754        Ok(())
755    }
756
757    fn ensure_directive_outbox_wallet_address(conn: &mut PgConnection) -> Result<()> {
758        diesel::sql_query(
759            r#"
760            ALTER TABLE directive_outbox
761                ADD COLUMN IF NOT EXISTS wallet_address BYTEA
762            "#,
763        )
764        .execute(conn)
765        .context("Failed to ensure directive_outbox.wallet_address column exists")?;
766
767        diesel::sql_query(
768            r#"
769            UPDATE directive_outbox
770            SET wallet_address = account_address
771            WHERE wallet_address IS NULL
772            "#,
773        )
774        .execute(conn)
775        .context("Failed to backfill directive_outbox.wallet_address")?;
776
777        diesel::sql_query(
778            r#"
779            ALTER TABLE directive_outbox
780                ALTER COLUMN wallet_address SET NOT NULL
781            "#,
782        )
783        .execute(conn)
784        .context("Failed to ensure directive_outbox.wallet_address is NOT NULL")?;
785
786        diesel::sql_query(
787            r#"
788            CREATE INDEX IF NOT EXISTS idx_directive_outbox_withdrawal_wallet_created
789                ON directive_outbox (wallet_address, created_ts_ms DESC)
790            "#,
791        )
792        .execute(conn)
793        .context("Failed to ensure directive_outbox wallet history index")?;
794
795        info!("Ensured directive_outbox.wallet_address schema exists");
796        Ok(())
797    }
798
799    /// Map an `OrderUpdateStatus` enum to its DB string representation.
800    pub(crate) fn order_update_status_to_db(
801        status: hypercall_types::OrderUpdateStatus,
802    ) -> &'static str {
803        match status {
804            hypercall_types::OrderUpdateStatus::Acked => "ACKED",
805            hypercall_types::OrderUpdateStatus::Open => "OPEN",
806            hypercall_types::OrderUpdateStatus::PartiallyFilled => "PARTIALLY_FILLED",
807            hypercall_types::OrderUpdateStatus::Filled => "FILLED",
808            hypercall_types::OrderUpdateStatus::Canceled => "CANCELED",
809            hypercall_types::OrderUpdateStatus::Rejected => "REJECTED",
810        }
811    }
812
813    /// Log and count option_token_address unique constraint violations.
814    pub(crate) fn observe_diesel_option_token_violation(err: &diesel::result::Error) {
815        if let diesel::result::Error::DatabaseError(
816            diesel::result::DatabaseErrorKind::UniqueViolation,
817            ref info,
818        ) = err
819        {
820            if info
821                .constraint_name()
822                .is_some_and(|c| c.contains("option_token_address"))
823            {
824                tracing::warn!(
825                    "Option token address unique constraint violation (likely re-listing): {}",
826                    info.message()
827                );
828                metrics::counter!("ht_diesel_option_token_violation_total").increment(1);
829            }
830        }
831    }
832
833    /// Persist the engine state snapshot's last_command_id to Postgres so
834    /// the db-archiver can compute a safe deletion boundary. Must be called
835    /// after every successful snapshot write to disk.
836    pub fn update_snapshot_boundary_sync(&self, last_command_id: i64) -> Result<()> {
837        let mut conn = self.pool.get()?;
838        diesel::sql_query(
839            "INSERT INTO engine_snapshot_boundary (id, last_command_id, updated_at) \
840             VALUES (1, $1, now()) \
841             ON CONFLICT (id) DO UPDATE SET last_command_id = $1, updated_at = now()",
842        )
843        .bind::<diesel::sql_types::BigInt, _>(last_command_id)
844        .execute(&mut conn)?;
845        Ok(())
846    }
847
848    // ===== Directive deposit/withdrawal methods =====
849
850    /// Check whether a directive outbox row exists for the given directive_id.
851    pub fn directive_outbox_exists_sync(&self, directive_id: &str) -> Result<bool> {
852        use diesel::sql_types::Text;
853
854        #[derive(diesel::QueryableByName)]
855        struct ExistsRow {
856            #[diesel(sql_type = diesel::sql_types::Bool)]
857            exists: bool,
858        }
859
860        let mut conn = self.pool.get()?;
861        let row = diesel::sql_query(
862            "SELECT EXISTS (
863                 SELECT 1 FROM directive_outbox WHERE directive_id = $1
864             ) AS exists",
865        )
866        .bind::<Text, _>(directive_id)
867        .get_result::<ExistsRow>(&mut conn)?;
868        Ok(row.exists)
869    }
870
871    /// Get the maximum observed block from rsm_deposit_credits.
872    pub async fn get_max_rsm_deposit_credit_observed_block(&self) -> Result<Option<u64>> {
873        use diesel::sql_types::{BigInt, Nullable};
874
875        #[derive(diesel::QueryableByName)]
876        struct MaxBlockRow {
877            #[diesel(sql_type = Nullable<BigInt>)]
878            max_block: Option<i64>,
879        }
880
881        let mut conn = self.pool.get()?;
882        let row = diesel::sql_query(
883            "SELECT COALESCE(
884                MIN(observed_block) FILTER (WHERE status <> 'submitted'),
885                MAX(observed_block)
886             ) AS max_block
887             FROM rsm_deposit_credits",
888        )
889        .get_result::<MaxBlockRow>(&mut conn)?;
890
891        row.max_block
892            .map(|value| {
893                u64::try_from(value).map_err(|_| {
894                    anyhow::anyhow!(
895                        "negative rsm_deposit_credits observed_block {} persisted in database",
896                        value
897                    )
898                })
899            })
900            .transpose()
901    }
902
903    /// Ensure a user_tiers row exists for the wallet that will receive deposit credit.
904    pub async fn ensure_observed_deposit_account(
905        &self,
906        account: &hypercall_types::WalletAddress,
907    ) -> Result<()> {
908        use diesel::sql_types::Binary;
909
910        let mut conn = self.pool.get()?;
911        diesel::sql_query(
912            "INSERT INTO user_tiers (wallet_address, tier, margin_mode, version)
913             VALUES ($1, 'tier2', 'standard', 1)
914             ON CONFLICT (wallet_address) DO NOTHING",
915        )
916        .bind::<Binary, _>(account)
917        .execute(&mut conn)?;
918        Ok(())
919    }
920
921    /// Claim an RSM deposit credit (idempotent upsert).
922    pub async fn claim_rsm_deposit_credit(
923        &self,
924        input: &hypercall_db::RsmDepositCreditClaimInput,
925    ) -> Result<hypercall_db::RsmDepositCreditClaimRecord> {
926        use diesel::sql_types::{BigInt, Binary, Text};
927
928        #[derive(diesel::QueryableByName)]
929        struct ClaimRow {
930            #[diesel(sql_type = Text)]
931            tx_hash: String,
932            #[diesel(sql_type = BigInt)]
933            log_index: i64,
934            #[diesel(sql_type = BigInt)]
935            observed_block: i64,
936            #[diesel(sql_type = Binary)]
937            account: hypercall_types::WalletAddress,
938            #[diesel(sql_type = Binary)]
939            token: hypercall_types::WalletAddress,
940            #[diesel(sql_type = Text)]
941            amount_wei: String,
942            #[diesel(sql_type = Text)]
943            credit_kind: String,
944            #[diesel(sql_type = Text)]
945            request_id: String,
946            #[diesel(sql_type = Text)]
947            status: String,
948        }
949
950        let mut conn = self.pool.get()?;
951        let row = diesel::sql_query(
952            "WITH inserted AS (
953                 INSERT INTO rsm_deposit_credits
954                     (tx_hash, log_index, observed_block, account, token, amount_wei, credit_kind, request_id)
955                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
956                 ON CONFLICT (tx_hash, log_index) DO NOTHING
957                 RETURNING tx_hash, log_index, observed_block, account, token, amount_wei, credit_kind, request_id, status
958             )
959             SELECT tx_hash, log_index, observed_block, account, token, amount_wei, credit_kind, request_id, status
960             FROM inserted
961             UNION ALL
962             SELECT tx_hash, log_index, observed_block, account, token, amount_wei, credit_kind, request_id, status
963             FROM rsm_deposit_credits
964             WHERE tx_hash = $1 AND log_index = $2
965             LIMIT 1",
966        )
967        .bind::<Text, _>(&input.tx_hash)
968        .bind::<BigInt, _>(input.log_index)
969        .bind::<BigInt, _>(input.observed_block)
970        .bind::<Binary, _>(&input.account)
971        .bind::<Binary, _>(&input.token)
972        .bind::<Text, _>(&input.amount_wei)
973        .bind::<Text, _>(&input.credit_kind)
974        .bind::<Text, _>(&input.request_id)
975        .get_result::<ClaimRow>(&mut conn)?;
976
977        Ok(hypercall_db::RsmDepositCreditClaimRecord {
978            tx_hash: row.tx_hash,
979            log_index: row.log_index,
980            observed_block: row.observed_block,
981            account: row.account,
982            token: row.token,
983            amount_wei: row.amount_wei,
984            credit_kind: row.credit_kind,
985            request_id: row.request_id,
986            status: row.status,
987        })
988    }
989
990    /// Mark an RSM deposit credit as submitted.
991    pub async fn mark_rsm_deposit_credit_submitted(&self, request_id: &str) -> Result<()> {
992        use diesel::sql_types::Text;
993
994        let mut conn = self.pool.get()?;
995        let updated = diesel::sql_query(
996            "UPDATE rsm_deposit_credits
997             SET status = 'submitted', error = NULL, updated_at = now()
998             WHERE request_id = $1",
999        )
1000        .bind::<Text, _>(request_id)
1001        .execute(&mut conn)?;
1002        if updated != 1 {
1003            anyhow::bail!(
1004                "expected 1 rsm_deposit_credits row updated for request_id {}, got {}",
1005                request_id,
1006                updated
1007            );
1008        }
1009        Ok(())
1010    }
1011
1012    /// Mark an RSM deposit credit as failed.
1013    pub async fn mark_rsm_deposit_credit_failed(
1014        &self,
1015        request_id: &str,
1016        error: &str,
1017    ) -> Result<()> {
1018        use diesel::sql_types::Text;
1019
1020        let mut conn = self.pool.get()?;
1021        let updated = diesel::sql_query(
1022            "UPDATE rsm_deposit_credits
1023             SET status = 'failed', error = $2, updated_at = now()
1024             WHERE request_id = $1",
1025        )
1026        .bind::<Text, _>(request_id)
1027        .bind::<Text, _>(error)
1028        .execute(&mut conn)?;
1029        if updated != 1 {
1030            anyhow::bail!(
1031                "expected 1 rsm_deposit_credits row failed for request_id {}, got {}",
1032                request_id,
1033                updated
1034            );
1035        }
1036        Ok(())
1037    }
1038
1039    /// Return the oldest pending Exchange.UsdcDeposit event for a HyperCore cash amount.
1040    pub async fn pending_rsm_usdc_deposit_for_amount(
1041        &self,
1042        amount_wei: &str,
1043    ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>> {
1044        use diesel::sql_types::{BigInt, Binary, Text};
1045
1046        #[derive(diesel::QueryableByName)]
1047        struct Row {
1048            #[diesel(sql_type = Text)]
1049            request_id: String,
1050            #[diesel(sql_type = Binary)]
1051            account: Vec<u8>,
1052            #[diesel(sql_type = Binary)]
1053            token: Vec<u8>,
1054            #[diesel(sql_type = Text)]
1055            tx_hash: String,
1056            #[diesel(sql_type = BigInt)]
1057            log_index: i64,
1058            #[diesel(sql_type = Text)]
1059            amount_wei: String,
1060        }
1061
1062        let mut conn = self.pool.get()?;
1063        let rows = diesel::sql_query(
1064            "SELECT request_id, account, token, tx_hash, log_index, amount_wei
1065             FROM rsm_deposit_credits
1066             WHERE credit_kind = 'usdc'
1067               AND status = 'pending'
1068               AND amount_wei = $1
1069             ORDER BY observed_block ASC, log_index ASC
1070             LIMIT 2",
1071        )
1072        .bind::<Text, _>(amount_wei)
1073        .load::<Row>(&mut conn)?;
1074
1075        if rows.len() > 1 {
1076            anyhow::bail!(
1077                "ambiguous pending Exchange.UsdcDeposit attribution for amount_wei {}",
1078                amount_wei
1079            );
1080        }
1081
1082        rows.into_iter()
1083            .next()
1084            .map(|row| {
1085                let account = wallet_from_bytes(
1086                    row.account.as_slice(),
1087                    &format!("pending USDC deposit {}:{}", row.tx_hash, row.log_index),
1088                )?;
1089                let token = wallet_from_bytes(
1090                    row.token.as_slice(),
1091                    &format!(
1092                        "pending USDC deposit token {}:{}",
1093                        row.tx_hash, row.log_index
1094                    ),
1095                )?;
1096                Ok(hypercall_db::RsmUsdcDepositMatch {
1097                    request_id: row.request_id,
1098                    account,
1099                    tx_hash: row.tx_hash,
1100                    log_index: row.log_index,
1101                    amount_wei: row.amount_wei,
1102                    token,
1103                })
1104            })
1105            .transpose()
1106    }
1107
1108    /// Return the pending Exchange.UsdcDeposit event for a CoreWriter writer-action EVM tx hash.
1109    pub async fn pending_rsm_usdc_deposit_for_evm_tx_hash(
1110        &self,
1111        evm_tx_hash: &str,
1112        amount_wei: &str,
1113    ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>> {
1114        use diesel::sql_types::{BigInt, Binary, Text};
1115
1116        #[derive(diesel::QueryableByName)]
1117        struct Row {
1118            #[diesel(sql_type = Text)]
1119            request_id: String,
1120            #[diesel(sql_type = Binary)]
1121            account: Vec<u8>,
1122            #[diesel(sql_type = Binary)]
1123            token: Vec<u8>,
1124            #[diesel(sql_type = Text)]
1125            tx_hash: String,
1126            #[diesel(sql_type = BigInt)]
1127            log_index: i64,
1128            #[diesel(sql_type = Text)]
1129            amount_wei: String,
1130        }
1131
1132        let mut conn = self.pool.get()?;
1133        let rows = diesel::sql_query(
1134            "SELECT request_id, account, token, tx_hash, log_index, amount_wei
1135             FROM rsm_deposit_credits
1136             WHERE credit_kind = 'usdc'
1137               AND status = 'pending'
1138               AND tx_hash = $1
1139             ORDER BY observed_block ASC, log_index ASC",
1140        )
1141        .bind::<Text, _>(evm_tx_hash)
1142        .load::<Row>(&mut conn)?;
1143
1144        if rows.is_empty() {
1145            return Ok(None);
1146        }
1147
1148        let amount_matched_rows: Vec<_> = rows
1149            .into_iter()
1150            .filter(|row| row.amount_wei == amount_wei)
1151            .collect();
1152
1153        if amount_matched_rows.is_empty() {
1154            anyhow::bail!(
1155                "CoreWriter evm_tx_hash {} matched pending Exchange.UsdcDeposit rows, but none had amount_wei {}",
1156                evm_tx_hash,
1157                amount_wei
1158            );
1159        }
1160
1161        if amount_matched_rows.len() > 1 {
1162            anyhow::bail!(
1163                "ambiguous pending Exchange.UsdcDeposit attribution for evm_tx_hash {} amount_wei {}",
1164                evm_tx_hash,
1165                amount_wei
1166            );
1167        }
1168
1169        let row = amount_matched_rows
1170            .into_iter()
1171            .next()
1172            .expect("amount_matched_rows is non-empty");
1173        let account = wallet_from_bytes(
1174            row.account.as_slice(),
1175            &format!("pending USDC deposit {}:{}", row.tx_hash, row.log_index),
1176        )?;
1177        let token = wallet_from_bytes(
1178            row.token.as_slice(),
1179            &format!(
1180                "pending USDC deposit token {}:{}",
1181                row.tx_hash, row.log_index
1182            ),
1183        )?;
1184        Ok(Some(hypercall_db::RsmUsdcDepositMatch {
1185            request_id: row.request_id,
1186            account,
1187            tx_hash: row.tx_hash,
1188            log_index: row.log_index,
1189            amount_wei: row.amount_wei,
1190            token,
1191        }))
1192    }
1193
1194    /// Return a PM liquidity Exchange deposit by CoreWriter writer-action EVM tx hash.
1195    pub async fn pm_liquidity_deposit_for_evm_tx_hash(
1196        &self,
1197        evm_tx_hash: &str,
1198        amount_wei: &str,
1199    ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>> {
1200        use diesel::sql_types::{BigInt, Binary, Text};
1201
1202        #[derive(diesel::QueryableByName)]
1203        struct Row {
1204            #[diesel(sql_type = Text)]
1205            request_id: String,
1206            #[diesel(sql_type = Binary)]
1207            account: Vec<u8>,
1208            #[diesel(sql_type = Binary)]
1209            token: Vec<u8>,
1210            #[diesel(sql_type = Text)]
1211            tx_hash: String,
1212            #[diesel(sql_type = BigInt)]
1213            log_index: i64,
1214            #[diesel(sql_type = Text)]
1215            amount_wei: String,
1216        }
1217
1218        let mut conn = self.pool.get()?;
1219        let rows = diesel::sql_query(
1220            "SELECT request_id, account, token, tx_hash, log_index, amount_wei
1221             FROM rsm_deposit_credits
1222             WHERE credit_kind = 'pm_liquidity'
1223               AND tx_hash = $1
1224               AND matched_hypercore_event_hash IS NULL
1225             ORDER BY observed_block ASC, log_index ASC",
1226        )
1227        .bind::<Text, _>(evm_tx_hash)
1228        .load::<Row>(&mut conn)?;
1229
1230        if rows.is_empty() {
1231            return Ok(None);
1232        }
1233
1234        let amount_matched_rows: Vec<_> = rows
1235            .into_iter()
1236            .filter(|row| row.amount_wei == amount_wei)
1237            .collect();
1238
1239        if amount_matched_rows.is_empty() {
1240            anyhow::bail!(
1241                "CoreWriter evm_tx_hash {} matched PM liquidity deposit rows, but none had amount_wei {}",
1242                evm_tx_hash,
1243                amount_wei
1244            );
1245        }
1246
1247        if amount_matched_rows.len() > 1 {
1248            anyhow::bail!(
1249                "ambiguous PM liquidity deposit attribution for evm_tx_hash {} amount_wei {}",
1250                evm_tx_hash,
1251                amount_wei
1252            );
1253        }
1254
1255        let row = amount_matched_rows
1256            .into_iter()
1257            .next()
1258            .expect("amount_matched_rows is non-empty");
1259        rsm_deposit_match_from_row(
1260            row.request_id,
1261            row.account,
1262            row.token,
1263            row.tx_hash,
1264            row.log_index,
1265            row.amount_wei,
1266            "PM liquidity deposit",
1267        )
1268        .map(Some)
1269    }
1270
1271    /// Return the oldest PM liquidity Exchange deposit for a HyperCore cash amount.
1272    pub async fn pm_liquidity_deposit_for_amount(
1273        &self,
1274        amount_wei: &str,
1275    ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>> {
1276        use diesel::sql_types::{BigInt, Binary, Text};
1277
1278        #[derive(diesel::QueryableByName)]
1279        struct Row {
1280            #[diesel(sql_type = Text)]
1281            request_id: String,
1282            #[diesel(sql_type = Binary)]
1283            account: Vec<u8>,
1284            #[diesel(sql_type = Binary)]
1285            token: Vec<u8>,
1286            #[diesel(sql_type = Text)]
1287            tx_hash: String,
1288            #[diesel(sql_type = BigInt)]
1289            log_index: i64,
1290            #[diesel(sql_type = Text)]
1291            amount_wei: String,
1292        }
1293
1294        let mut conn = self.pool.get()?;
1295        let rows = diesel::sql_query(
1296            "SELECT request_id, account, token, tx_hash, log_index, amount_wei
1297             FROM rsm_deposit_credits
1298             WHERE credit_kind = 'pm_liquidity'
1299               AND amount_wei = $1
1300               AND matched_hypercore_event_hash IS NULL
1301             ORDER BY observed_block ASC, log_index ASC
1302             LIMIT 2",
1303        )
1304        .bind::<Text, _>(amount_wei)
1305        .load::<Row>(&mut conn)?;
1306
1307        if rows.len() > 1 {
1308            anyhow::bail!(
1309                "ambiguous PM liquidity deposit attribution for amount_wei {}",
1310                amount_wei
1311            );
1312        }
1313
1314        rows.into_iter()
1315            .next()
1316            .map(|row| {
1317                rsm_deposit_match_from_row(
1318                    row.request_id,
1319                    row.account,
1320                    row.token,
1321                    row.tx_hash,
1322                    row.log_index,
1323                    row.amount_wei,
1324                    "PM liquidity deposit",
1325                )
1326            })
1327            .transpose()
1328    }
1329
1330    /// Mark a PM liquidity Exchange deposit as consumed by a HyperCore cash event.
1331    pub async fn mark_pm_liquidity_deposit_hypercore_matched(
1332        &self,
1333        request_id: &str,
1334        event_hash: &str,
1335    ) -> Result<()> {
1336        use diesel::sql_types::Text;
1337
1338        let mut conn = self.pool.get()?;
1339        let updated = diesel::sql_query(
1340            "UPDATE rsm_deposit_credits
1341             SET matched_hypercore_event_hash = $2, updated_at = now()
1342             WHERE request_id = $1
1343               AND credit_kind = 'pm_liquidity'
1344               AND (matched_hypercore_event_hash IS NULL OR matched_hypercore_event_hash = $2)",
1345        )
1346        .bind::<Text, _>(request_id)
1347        .bind::<Text, _>(event_hash)
1348        .execute(&mut conn)?;
1349        if updated != 1 {
1350            anyhow::bail!(
1351                "expected 1 PM liquidity deposit matched for request_id {}, got {}",
1352                request_id,
1353                updated
1354            );
1355        }
1356        Ok(())
1357    }
1358
1359    /// Return a pending Exchange.UsdcDeposit request for an already credited HyperCore event.
1360    pub async fn pending_rsm_usdc_deposit_for_credited_hypercore_event(
1361        &self,
1362        event_hash: &str,
1363        amount_wei: &str,
1364    ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>> {
1365        use diesel::sql_types::{BigInt, Binary, Text};
1366
1367        #[derive(diesel::QueryableByName)]
1368        struct Row {
1369            #[diesel(sql_type = Text)]
1370            request_id: String,
1371            #[diesel(sql_type = Binary)]
1372            account: Vec<u8>,
1373            #[diesel(sql_type = Binary)]
1374            token: Vec<u8>,
1375            #[diesel(sql_type = Text)]
1376            tx_hash: String,
1377            #[diesel(sql_type = BigInt)]
1378            log_index: i64,
1379            #[diesel(sql_type = Text)]
1380            amount_wei: String,
1381        }
1382
1383        let mut conn = self.pool.get()?;
1384        let rows = diesel::sql_query(
1385            "SELECT r.request_id, r.account, r.token, r.tx_hash, r.log_index, r.amount_wei
1386             FROM hypercore_cash_ledger_events h
1387             JOIN rsm_deposit_credits r
1388               ON r.account = h.wallet
1389              AND r.credit_kind = 'usdc'
1390              AND r.status = 'pending'
1391              AND r.amount_wei = $2
1392             WHERE h.event_hash = $1
1393               AND h.delta_type = 'pool_transfer'
1394               AND h.ledger_event_id IS NOT NULL
1395             ORDER BY r.observed_block ASC, r.log_index ASC
1396             LIMIT 2",
1397        )
1398        .bind::<Text, _>(event_hash)
1399        .bind::<Text, _>(amount_wei)
1400        .load::<Row>(&mut conn)?;
1401
1402        if rows.len() > 1 {
1403            anyhow::bail!(
1404                "ambiguous pending Exchange.UsdcDeposit replay attribution for HyperCore event {} amount_wei {}",
1405                event_hash,
1406                amount_wei
1407            );
1408        }
1409
1410        rows.into_iter()
1411            .next()
1412            .map(|row| {
1413                let account = wallet_from_bytes(
1414                    row.account.as_slice(),
1415                    &format!(
1416                        "pending replay USDC deposit {}:{}",
1417                        row.tx_hash, row.log_index
1418                    ),
1419                )?;
1420                let token = wallet_from_bytes(
1421                    row.token.as_slice(),
1422                    &format!(
1423                        "pending replay USDC deposit token {}:{}",
1424                        row.tx_hash, row.log_index
1425                    ),
1426                )?;
1427                Ok(hypercall_db::RsmUsdcDepositMatch {
1428                    request_id: row.request_id,
1429                    account,
1430                    tx_hash: row.tx_hash,
1431                    log_index: row.log_index,
1432                    amount_wei: row.amount_wei,
1433                    token,
1434                })
1435            })
1436            .transpose()
1437    }
1438
1439    /// Return the credited wallet for an already-applied HyperCore cash event.
1440    pub async fn credited_wallet_for_hypercore_cash_event(
1441        &self,
1442        event_hash: &str,
1443    ) -> Result<Option<hypercall_types::WalletAddress>> {
1444        use diesel::sql_types::{Binary, Text};
1445
1446        #[derive(diesel::QueryableByName)]
1447        struct Row {
1448            #[diesel(sql_type = Binary)]
1449            wallet: Vec<u8>,
1450        }
1451
1452        let mut conn = self.pool.get()?;
1453        let rows = diesel::sql_query(
1454            "SELECT wallet
1455             FROM hypercore_cash_ledger_events
1456             WHERE event_hash = $1
1457               AND delta_type = 'pool_transfer'
1458               AND ledger_event_id IS NOT NULL
1459             ORDER BY id ASC
1460             LIMIT 2",
1461        )
1462        .bind::<Text, _>(event_hash)
1463        .load::<Row>(&mut conn)?;
1464
1465        if rows.len() > 1 {
1466            anyhow::bail!(
1467                "duplicate credited wallet rows for hypercore cash event hash {}",
1468                event_hash
1469            );
1470        }
1471
1472        rows.into_iter()
1473            .next()
1474            .map(|row| wallet_from_bytes(row.wallet.as_slice(), "credited hypercore cash event"))
1475            .transpose()
1476    }
1477
1478    /// Return true when this HyperCore cash event is already recorded as non-crediting.
1479    pub async fn non_crediting_hypercore_cash_event(
1480        &self,
1481        event_hash: &str,
1482        amount_usdc: rust_decimal::Decimal,
1483    ) -> Result<bool> {
1484        use diesel::sql_types::{Bool, Numeric, Text};
1485
1486        #[derive(diesel::QueryableByName)]
1487        struct Row {
1488            #[diesel(sql_type = Bool)]
1489            exists: bool,
1490        }
1491
1492        let mut conn = self.pool.get()?;
1493        let row = diesel::sql_query(
1494            "SELECT EXISTS (
1495                 SELECT 1
1496                 FROM hypercore_cash_ledger_events
1497                 WHERE event_hash = $1
1498                   AND delta_type = 'pool_transfer'
1499                   AND amount_usdc = $2
1500                   AND status = 'submitted'
1501                   AND ledger_event_id IS NULL
1502             ) AS exists",
1503        )
1504        .bind::<Text, _>(event_hash)
1505        .bind::<Numeric, _>(amount_usdc)
1506        .get_result::<Row>(&mut conn)?;
1507        Ok(row.exists)
1508    }
1509
1510    /// Return recent cash deposit attribution rows for the admin panel.
1511    pub async fn list_recent_cash_deposit_monitoring_rows(
1512        &self,
1513        limit: i64,
1514        offset: i64,
1515    ) -> Result<Vec<hypercall_db::DepositMonitoringRow>> {
1516        use diesel::sql_types::{BigInt, Binary, Nullable, Text};
1517
1518        #[derive(diesel::QueryableByName)]
1519        struct Row {
1520            #[diesel(sql_type = Text)]
1521            source: String,
1522            #[diesel(sql_type = Text)]
1523            status: String,
1524            #[diesel(sql_type = Text)]
1525            correlation_status: String,
1526            #[diesel(sql_type = Binary)]
1527            wallet: Vec<u8>,
1528            #[diesel(sql_type = Text)]
1529            amount_usdc: String,
1530            #[diesel(sql_type = Text)]
1531            event_hash: String,
1532            #[diesel(sql_type = Nullable<Text>)]
1533            tx_hash: Option<String>,
1534            #[diesel(sql_type = Nullable<Text>)]
1535            request_id: Option<String>,
1536            #[diesel(sql_type = Nullable<BigInt>)]
1537            observed_block: Option<i64>,
1538            #[diesel(sql_type = Nullable<BigInt>)]
1539            log_index: Option<i64>,
1540            #[diesel(sql_type = Nullable<BigInt>)]
1541            ledger_event_id: Option<i64>,
1542            #[diesel(sql_type = Text)]
1543            created_at: String,
1544            #[diesel(sql_type = Text)]
1545            updated_at: String,
1546        }
1547
1548        let mut conn = self.pool.get()?;
1549        let rows = diesel::sql_query(
1550            "SELECT source, status, correlation_status, wallet, amount_usdc, event_hash, tx_hash, request_id,
1551                    observed_block, log_index, ledger_event_id, created_at, updated_at
1552             FROM (
1553                 SELECT
1554                     'hypercore'::text AS source,
1555                     status,
1556                     'cash_ledger_event'::text AS correlation_status,
1557                     wallet,
1558                     amount_usdc::text AS amount_usdc,
1559                     event_hash,
1560                     NULL::text AS tx_hash,
1561                     NULL::text AS request_id,
1562                     NULL::bigint AS observed_block,
1563                     NULL::bigint AS log_index,
1564                     ledger_event_id,
1565                     created_at::text AS created_at,
1566                     updated_at::text AS updated_at
1567                 FROM hypercore_cash_ledger_events
1568                 WHERE delta_type IN ('deposit', 'pool_transfer')
1569                 UNION ALL
1570                 SELECT
1571                     CASE WHEN credit_kind = 'usdc' THEN 'exchange_usdc' ELSE 'rsm_option' END AS source,
1572                     status,
1573                     CASE
1574                         WHEN credit_kind = 'usdc' AND status = 'submitted' THEN 'writer_action_correlated'
1575                         WHEN credit_kind = 'usdc' AND status = 'pending' THEN 'pending_writer_action'
1576                         WHEN credit_kind = 'usdc' AND status = 'failed' THEN 'failed'
1577                         ELSE 'not_applicable'
1578                     END AS correlation_status,
1579                     account AS wallet,
1580                     CASE
1581                         WHEN credit_kind = 'usdc'
1582                         THEN (amount_wei::numeric / 1000000)::text
1583                         ELSE amount_wei
1584                     END AS amount_usdc,
1585                     tx_hash AS event_hash,
1586                     tx_hash,
1587                     request_id,
1588                     observed_block,
1589                     log_index,
1590                     NULL::bigint AS ledger_event_id,
1591                     created_at::text AS created_at,
1592                     updated_at::text AS updated_at
1593                 FROM rsm_deposit_credits
1594             ) deposits
1595             ORDER BY created_at DESC, event_hash DESC
1596             LIMIT $1 OFFSET $2",
1597        )
1598        .bind::<BigInt, _>(limit)
1599        .bind::<BigInt, _>(offset)
1600        .load::<Row>(&mut conn)?;
1601
1602        rows.into_iter()
1603            .map(|row| {
1604                Ok(hypercall_db::DepositMonitoringRow {
1605                    source: row.source,
1606                    status: row.status,
1607                    correlation_status: row.correlation_status,
1608                    wallet: wallet_from_bytes(row.wallet.as_slice(), "deposit monitoring wallet")?,
1609                    amount_usdc: row.amount_usdc,
1610                    event_hash: row.event_hash,
1611                    tx_hash: row.tx_hash,
1612                    request_id: row.request_id,
1613                    observed_block: row.observed_block,
1614                    log_index: row.log_index,
1615                    ledger_event_id: row.ledger_event_id,
1616                    created_at: row.created_at,
1617                    updated_at: row.updated_at,
1618                })
1619            })
1620            .collect()
1621    }
1622
1623    /// Apply a HyperCore cash deposit (idempotent).
1624    pub async fn apply_hypercore_cash_deposit(
1625        &self,
1626        input: &hypercall_db::HypercoreCashLedgerApply,
1627    ) -> Result<hypercall_db::HypercoreCashLedgerApplyResult> {
1628        use diesel::sql_types::{BigInt, Binary, Nullable, Numeric, Text};
1629        use diesel::Connection;
1630        use diesel::OptionalExtension;
1631        use rust_decimal::Decimal;
1632
1633        if input.amount_usdc <= Decimal::ZERO {
1634            anyhow::bail!(
1635                "hypercore cash deposit amount must be positive for {} event_hash={}",
1636                input.wallet,
1637                input.event_hash
1638            );
1639        }
1640
1641        #[derive(diesel::QueryableByName)]
1642        struct InsertedEventRow {
1643            #[diesel(sql_type = BigInt)]
1644            id: i64,
1645        }
1646
1647        #[derive(diesel::QueryableByName)]
1648        struct LedgerEventRow {
1649            #[diesel(sql_type = BigInt)]
1650            id: i64,
1651        }
1652
1653        #[derive(diesel::QueryableByName)]
1654        struct ExistingEventRow {
1655            #[diesel(sql_type = BigInt)]
1656            id: i64,
1657            #[diesel(sql_type = Nullable<BigInt>)]
1658            ledger_event_id: Option<i64>,
1659            #[diesel(sql_type = Nullable<Numeric>)]
1660            balance_after: Option<Decimal>,
1661        }
1662
1663        let mut conn = self.pool.get()?;
1664        let row = conn.transaction::<hypercall_db::HypercoreCashLedgerApplyResult, diesel::result::Error, _>(|conn| {
1665            let inserted = diesel::sql_query(
1666                "INSERT INTO hypercore_cash_ledger_events
1667                    (wallet, event_hash, event_time_ms, delta_type, amount_usdc)
1668                 VALUES ($1, $2, $3, 'pool_transfer', $4)
1669                 ON CONFLICT (wallet, event_hash, delta_type) DO NOTHING
1670                 RETURNING id",
1671            )
1672            .bind::<Binary, _>(&input.wallet)
1673            .bind::<Text, _>(&input.event_hash)
1674            .bind::<BigInt, _>(input.event_time_ms)
1675            .bind::<Numeric, _>(input.amount_usdc)
1676            .get_result::<InsertedEventRow>(conn)
1677            .optional()?;
1678
1679            let Some(inserted) = inserted else {
1680                let existing = diesel::sql_query(
1681                    "SELECT id, ledger_event_id, balance_after
1682                     FROM hypercore_cash_ledger_events
1683                     WHERE wallet = $1 AND event_hash = $2 AND delta_type = 'pool_transfer'
1684                     FOR UPDATE",
1685                )
1686                .bind::<Binary, _>(&input.wallet)
1687                .bind::<Text, _>(&input.event_hash)
1688                .get_result::<ExistingEventRow>(conn)?;
1689
1690                if let Some(ledger_event_id) = existing.ledger_event_id {
1691                    return Ok(hypercall_db::HypercoreCashLedgerApplyResult {
1692                        applied: false,
1693                        balance_after: existing.balance_after,
1694                        ledger_event_id: Some(
1695                            u64::try_from(ledger_event_id).unwrap_or_else(|_| {
1696                                panic!(
1697                                    "STATE_CORRUPTION: negative ledger_event_id {} for hypercore deposit {}",
1698                                    ledger_event_id, input.event_hash
1699                                )
1700                            }),
1701                        ),
1702                    });
1703                }
1704
1705                let ledger = diesel::sql_query(
1706                    "INSERT INTO ledger_events (wallet, event_ts_ms, delta, event_type)
1707                     VALUES ($1, $2, $3, 'deposit')
1708                     RETURNING id",
1709                )
1710                .bind::<Binary, _>(&input.wallet)
1711                .bind::<BigInt, _>(input.event_time_ms)
1712                .bind::<Numeric, _>(input.amount_usdc)
1713                .get_result::<LedgerEventRow>(conn)?;
1714
1715                diesel::sql_query(
1716                    "UPDATE hypercore_cash_ledger_events
1717                     SET ledger_event_id = $2,
1718                         status = 'submitted',
1719                         error = NULL,
1720                         updated_at = NOW()
1721                     WHERE id = $1 AND ledger_event_id IS NULL",
1722                )
1723                .bind::<BigInt, _>(existing.id)
1724                .bind::<BigInt, _>(ledger.id)
1725                .execute(conn)?;
1726
1727                return Ok(hypercall_db::HypercoreCashLedgerApplyResult {
1728                    applied: true,
1729                    balance_after: None,
1730                    ledger_event_id: Some(u64::try_from(ledger.id).unwrap_or_else(|_| {
1731                        panic!(
1732                            "STATE_CORRUPTION: negative ledger_event_id {} for hypercore deposit {}",
1733                            ledger.id, input.event_hash
1734                        )
1735                    })),
1736                });
1737            };
1738
1739            let ledger = diesel::sql_query(
1740                "INSERT INTO ledger_events (wallet, event_ts_ms, delta, event_type)
1741                 VALUES ($1, $2, $3, 'deposit')
1742                 RETURNING id",
1743            )
1744            .bind::<Binary, _>(&input.wallet)
1745            .bind::<BigInt, _>(input.event_time_ms)
1746            .bind::<Numeric, _>(input.amount_usdc)
1747            .get_result::<LedgerEventRow>(conn)?;
1748
1749            diesel::sql_query(
1750                "UPDATE hypercore_cash_ledger_events
1751                 SET ledger_event_id = $2,
1752                     status = 'submitted',
1753                     error = NULL,
1754                     updated_at = NOW()
1755                 WHERE id = $1",
1756            )
1757            .bind::<BigInt, _>(inserted.id)
1758            .bind::<BigInt, _>(ledger.id)
1759            .execute(conn)?;
1760
1761            Ok(hypercall_db::HypercoreCashLedgerApplyResult {
1762                applied: true,
1763                balance_after: None,
1764                ledger_event_id: Some(u64::try_from(ledger.id).unwrap_or_else(|_| {
1765                    panic!(
1766                        "STATE_CORRUPTION: negative ledger_event_id {} for hypercore deposit {}",
1767                        ledger.id, input.event_hash
1768                    )
1769                })),
1770            })
1771        })?;
1772        Ok(row)
1773    }
1774
1775    /// Record an observed Exchange HyperCore deposit that must not credit engine cash.
1776    pub async fn record_hypercore_cash_deposit_non_crediting(
1777        &self,
1778        input: &hypercall_db::HypercoreCashLedgerApply,
1779    ) -> Result<()> {
1780        use diesel::sql_types::{BigInt, Binary, Numeric, Text};
1781        use rust_decimal::Decimal;
1782
1783        if input.amount_usdc <= Decimal::ZERO {
1784            anyhow::bail!(
1785                "non-crediting hypercore cash deposit amount must be positive for {} event_hash={}",
1786                input.wallet,
1787                input.event_hash
1788            );
1789        }
1790
1791        let mut conn = self.pool.get()?;
1792        diesel::sql_query(
1793            "INSERT INTO hypercore_cash_ledger_events
1794                (wallet, event_hash, event_time_ms, delta_type, amount_usdc, status, error)
1795             VALUES ($1, $2, $3, 'pool_transfer', $4, 'submitted', NULL)
1796             ON CONFLICT (wallet, event_hash, delta_type) DO UPDATE
1797             SET status = CASE
1798                     WHEN hypercore_cash_ledger_events.ledger_event_id IS NULL
1799                     THEN 'submitted'
1800                     ELSE hypercore_cash_ledger_events.status
1801                 END,
1802                 error = CASE
1803                     WHEN hypercore_cash_ledger_events.ledger_event_id IS NULL
1804                     THEN NULL
1805                     ELSE hypercore_cash_ledger_events.error
1806                 END,
1807                 updated_at = NOW()",
1808        )
1809        .bind::<Binary, _>(&input.wallet)
1810        .bind::<Text, _>(&input.event_hash)
1811        .bind::<BigInt, _>(input.event_time_ms)
1812        .bind::<Numeric, _>(input.amount_usdc)
1813        .execute(&mut conn)?;
1814        Ok(())
1815    }
1816
1817    /// Record a HyperCore cash deposit that cannot yet be credited because
1818    /// margin mode is unknown. The row pins the replay watermark until a later
1819    /// replay can apply the credit with a durable ledger sequence.
1820    pub async fn record_hypercore_cash_deposit_pending_margin_mode(
1821        &self,
1822        input: &hypercall_db::HypercoreCashLedgerApply,
1823    ) -> Result<()> {
1824        use diesel::sql_types::{BigInt, Binary, Numeric, Text};
1825        use rust_decimal::Decimal;
1826
1827        if input.amount_usdc <= Decimal::ZERO {
1828            anyhow::bail!(
1829                "hypercore cash deposit amount must be positive for {} event_hash={}",
1830                input.wallet,
1831                input.event_hash
1832            );
1833        }
1834
1835        let mut conn = self.pool.get()?;
1836        diesel::sql_query(
1837            "INSERT INTO hypercore_cash_ledger_events
1838                (wallet, event_hash, event_time_ms, delta_type, amount_usdc, status, error)
1839             VALUES ($1, $2, $3, 'pool_transfer', $4, 'pending_margin_mode', 'missing margin mode')
1840             ON CONFLICT (wallet, event_hash, delta_type) DO UPDATE
1841             SET status = CASE
1842                     WHEN hypercore_cash_ledger_events.ledger_event_id IS NULL
1843                     THEN 'pending_margin_mode'
1844                     ELSE hypercore_cash_ledger_events.status
1845                 END,
1846                 error = CASE
1847                     WHEN hypercore_cash_ledger_events.ledger_event_id IS NULL
1848                     THEN 'missing margin mode'
1849                     ELSE hypercore_cash_ledger_events.error
1850                 END,
1851                 updated_at = NOW()",
1852        )
1853        .bind::<Binary, _>(&input.wallet)
1854        .bind::<Text, _>(&input.event_hash)
1855        .bind::<BigInt, _>(input.event_time_ms)
1856        .bind::<Numeric, _>(input.amount_usdc)
1857        .execute(&mut conn)?;
1858        Ok(())
1859    }
1860
1861    /// Get the exchange-wide cash ledger watermark (max event_time_ms).
1862    pub fn get_exchange_cash_ledger_watermark_sync(&self) -> Result<Option<i64>> {
1863        use diesel::sql_types::{BigInt, Nullable};
1864
1865        #[derive(diesel::QueryableByName)]
1866        struct Row {
1867            #[diesel(sql_type = Nullable<BigInt>)]
1868            last_event_time_ms: Option<i64>,
1869        }
1870
1871        let mut conn = self.pool.get()?;
1872        // Only consider deposit/pool_transfer rows for the watermark. Local
1873        // withdraw reservations can have timestamps newer than the last
1874        // observed on-chain event and would push the watermark forward,
1875        // causing the HTTP catch-up to skip older unseen deposits.
1876        // Pending uncredited rows pin the watermark to their oldest timestamp
1877        // so a missing-margin-mode deposit stays replayable until credited.
1878        let row = diesel::sql_query(
1879            "WITH pending_uncredited AS (
1880                 SELECT MIN(event_time_ms)::bigint AS event_time_ms
1881                 FROM hypercore_cash_ledger_events
1882                 WHERE delta_type IN ('deposit', 'pool_transfer')
1883                   AND ledger_event_id IS NULL
1884                   AND status = 'pending_margin_mode'
1885             ),
1886             credited AS (
1887                 SELECT MAX(event_time_ms)::bigint AS event_time_ms
1888                 FROM hypercore_cash_ledger_events
1889                 WHERE delta_type IN ('deposit', 'pool_transfer')
1890                   AND (ledger_event_id IS NOT NULL OR status = 'submitted')
1891             )
1892             SELECT COALESCE(
1893                 (SELECT event_time_ms FROM pending_uncredited),
1894                 (SELECT event_time_ms FROM credited)
1895             ) AS last_event_time_ms",
1896        )
1897        .get_result::<Row>(&mut conn)?;
1898        Ok(row.last_event_time_ms)
1899    }
1900
1901    /// Load the current nonce state for a signer address (sync, used by startup code).
1902    pub async fn get_rsm_signer_nonce(
1903        &self,
1904        signer: &hypercall_types::WalletAddress,
1905    ) -> Result<Option<hypercall_db::RsmSignerNonceRecord>> {
1906        use diesel::OptionalExtension;
1907
1908        let mut conn = self.pool.get()?;
1909        let result = crate::schema::rsm_signer_nonces::table
1910            .filter(crate::schema::rsm_signer_nonces::signer_address.eq(signer))
1911            .first::<crate::models::RsmSignerNonceRecord>(&mut conn)
1912            .optional()?;
1913        Ok(result.map(Into::into))
1914    }
1915
1916    /// Look up an option instrument by its on-chain token address.
1917    pub async fn get_option_instrument_for_credit(
1918        &self,
1919        token: &hypercall_types::WalletAddress,
1920    ) -> Result<Option<hypercall_db::OptionInstrumentForCredit>> {
1921        use diesel::sql_types::{BigInt, Binary, Numeric, Text};
1922        use diesel::OptionalExtension;
1923
1924        #[derive(diesel::QueryableByName)]
1925        struct InstrumentRow {
1926            #[diesel(sql_type = Text)]
1927            id: String,
1928            #[diesel(sql_type = Text)]
1929            underlying: String,
1930            #[diesel(sql_type = BigInt)]
1931            expiry: i64,
1932            #[diesel(sql_type = Numeric)]
1933            strike: rust_decimal::Decimal,
1934            #[diesel(sql_type = Text)]
1935            option_type: String,
1936        }
1937
1938        let mut conn = self.pool.get()?;
1939        let row = diesel::sql_query(
1940            "SELECT id, underlying, expiry::bigint AS expiry, strike, option_type
1941             FROM instruments
1942             WHERE option_token_address = $1
1943             LIMIT 1",
1944        )
1945        .bind::<Binary, _>(token)
1946        .get_result::<InstrumentRow>(&mut conn)
1947        .optional()?;
1948
1949        Ok(row.map(|row| hypercall_db::OptionInstrumentForCredit {
1950            symbol: row.id,
1951            underlying: row.underlying,
1952            expiry: row.expiry,
1953            strike: row.strike,
1954            option_type: row.option_type,
1955        }))
1956    }
1957}
1958
1959#[cfg(test)]
1960mod tests {
1961    use super::*;
1962    use std::sync::atomic::{AtomicUsize, Ordering};
1963
1964    #[test]
1965    fn replaces_multi_statement_concurrent_index_migration() {
1966        let migrations = HypercallMigrations
1967            .migrations()
1968            .expect("embedded migrations should load");
1969        let migration = migrations
1970            .iter()
1971            .find(|migration| {
1972                migration.name().to_string() == ORDER_INFOS_ACTIVE_SYMBOL_INDEX_MIGRATION
1973            })
1974            .expect("order_infos active symbol migration should be present");
1975
1976        assert_eq!(migration.name().version().to_string(), "2026-06-01-000001");
1977        assert!(!migration.metadata().run_in_transaction());
1978    }
1979
1980    #[test]
1981    fn dyn_connection_manager_calls_closure_on_connect() {
1982        let call_count = Arc::new(AtomicUsize::new(0));
1983        let cc = call_count.clone();
1984        let mgr = DynConnectionManager {
1985            url_source: Arc::new(move || {
1986                cc.fetch_add(1, Ordering::SeqCst);
1987                "postgres://invalid-will-fail/db".to_string()
1988            }),
1989        };
1990
1991        // connect() will fail (no real DB), but the closure should be called
1992        let _ = r2d2::ManageConnection::connect(&mgr);
1993        let _ = r2d2::ManageConnection::connect(&mgr);
1994        let _ = r2d2::ManageConnection::connect(&mgr);
1995        assert_eq!(call_count.load(Ordering::SeqCst), 3);
1996    }
1997
1998    #[test]
1999    fn dyn_connection_manager_sees_url_changes() {
2000        let url = Arc::new(std::sync::Mutex::new(
2001            "postgres://first@host/db".to_string(),
2002        ));
2003        let url_clone = url.clone();
2004        let mgr = DynConnectionManager {
2005            url_source: Arc::new(move || url_clone.lock().unwrap().clone()),
2006        };
2007
2008        // First call sees "first"
2009        let _ = r2d2::ManageConnection::connect(&mgr);
2010
2011        // Swap URL
2012        *url.lock().unwrap() = "postgres://second@host/db".to_string();
2013
2014        // Second call sees "second" — verifying dynamic resolution
2015        let _ = r2d2::ManageConnection::connect(&mgr);
2016    }
2017
2018    #[test]
2019    fn dyn_connection_manager_from_password_auth() {
2020        let auth = DbAuthConfig::password("postgres://u:p@host/db");
2021        let mgr = DynConnectionManager::new(&auth);
2022        // Closure resolves the static URL
2023        let url = (mgr.url_source)();
2024        assert_eq!(url, "postgres://u:p@host/db");
2025    }
2026
2027    #[cfg(feature = "rds-iam")]
2028    #[test]
2029    fn dyn_connection_manager_from_iam_auth_sees_rotation() {
2030        let parsed = crate::rds_iam::ParsedDbUrl::parse("postgres://user@host/db").unwrap();
2031        let mock_gen = crate::rds_iam::test_support::MockTokenGenerator::new(vec![]);
2032        let provider =
2033            crate::rds_iam::RdsIamTokenProvider::with_generator(parsed, Box::new(mock_gen));
2034        provider.store_url("postgres://user:token_v1@host/db".into());
2035
2036        let auth = DbAuthConfig::rds_iam_with_provider(provider.clone());
2037        let mgr = DynConnectionManager::new(&auth);
2038
2039        assert_eq!((mgr.url_source)(), "postgres://user:token_v1@host/db");
2040
2041        provider.store_url("postgres://user:token_v2@host/db".into());
2042        assert_eq!((mgr.url_source)(), "postgres://user:token_v2@host/db");
2043    }
2044}