1use crate::db_auth::DbAuthConfig;
20use anyhow::{Context, Result};
21use diesel::backend::Backend;
22use diesel::connection::SimpleConnection;
23use diesel::migration::{
24 Migration, MigrationMetadata, MigrationName, MigrationSource, MigrationVersion,
25};
26use diesel::pg::Pg;
27use diesel::pg::PgConnection;
28use diesel::prelude::*;
29use diesel::r2d2::{self, ConnectionManager};
30use diesel::{Connection, RunQueryDsl};
31use diesel_migrations::{EmbeddedMigrations, MigrationHarness};
32use std::fmt;
33use std::sync::Arc;
34use tracing::info;
35
36pub struct DynConnectionManager {
41 url_source: Arc<dyn Fn() -> String + Send + Sync>,
42}
43
44impl DynConnectionManager {
45 pub fn new(auth: &DbAuthConfig) -> Self {
48 let auth = auth.clone();
49 Self {
50 url_source: Arc::new(move || auth.current_url()),
51 }
52 }
53}
54
55impl r2d2::ManageConnection for DynConnectionManager {
56 type Connection = PgConnection;
57 type Error = r2d2::Error;
58
59 fn connect(&self) -> std::result::Result<PgConnection, r2d2::Error> {
60 let url = (self.url_source)();
61 PgConnection::establish(&url).map_err(r2d2::Error::ConnectionError)
62 }
63
64 fn is_valid(&self, conn: &mut PgConnection) -> std::result::Result<(), r2d2::Error> {
65 conn.batch_execute("SELECT 1")
66 .map_err(r2d2::Error::QueryError)
67 }
68
69 fn has_broken(&self, conn: &mut PgConnection) -> bool {
70 use diesel::r2d2::R2D2Connection;
71 std::thread::panicking() || conn.is_broken()
72 }
73}
74
75impl std::fmt::Debug for DynConnectionManager {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 f.debug_struct("DynConnectionManager").finish()
78 }
79}
80
81pub type DbPool = diesel::r2d2::Pool<DynConnectionManager>;
83
84pub type LegacyDbPool = diesel::r2d2::Pool<ConnectionManager<PgConnection>>;
87
88fn wallet_from_bytes(bytes: &[u8], label: &str) -> Result<hypercall_types::WalletAddress> {
89 let arr: [u8; 20] = bytes.try_into().map_err(|_| {
90 anyhow::anyhow!(
91 "invalid {} address length {}, expected 20",
92 label,
93 bytes.len()
94 )
95 })?;
96 Ok(hypercall_types::WalletAddress::from(arr))
97}
98
99fn rsm_deposit_match_from_row(
100 request_id: String,
101 account: Vec<u8>,
102 token: Vec<u8>,
103 tx_hash: String,
104 log_index: i64,
105 amount_wei: String,
106 label: &str,
107) -> Result<hypercall_db::RsmUsdcDepositMatch> {
108 let account = wallet_from_bytes(
109 account.as_slice(),
110 &format!("{label} account {tx_hash}:{log_index}"),
111 )?;
112 let token = wallet_from_bytes(
113 token.as_slice(),
114 &format!("{label} token {tx_hash}:{log_index}"),
115 )?;
116 Ok(hypercall_db::RsmUsdcDepositMatch {
117 request_id,
118 account,
119 tx_hash,
120 log_index,
121 amount_wei,
122 token,
123 })
124}
125
126pub(crate) fn current_option_token_deployment() -> Result<hypercall_types::OptionTokenDeployment> {
129 use alloy::primitives::{Address, B256};
130 use std::str::FromStr;
131
132 let registry_str = std::env::var("OPTION_REGISTRY_ADDRESS").map_err(|_| {
133 anyhow::anyhow!(
134 "Option token derivation requires OPTION_REGISTRY_ADDRESS from backend contracts config"
135 )
136 })?;
137 let init_hash_str =
138 std::env::var("OPTION_TOKEN_BEACON_PROXY_INIT_CODE_HASH").map_err(|_| {
139 anyhow::anyhow!(
140 "Option token derivation requires OPTION_TOKEN_BEACON_PROXY_INIT_CODE_HASH from backend contracts config"
141 )
142 })?;
143
144 let option_registry = Address::from_str(registry_str.trim()).map_err(|e| {
145 anyhow::anyhow!(
146 "Option token derivation requires valid OPTION_REGISTRY_ADDRESS: {}",
147 e
148 )
149 })?;
150 let beacon_proxy_init_code_hash = B256::from_str(init_hash_str.trim()).map_err(|e| {
151 anyhow::anyhow!(
152 "Option token derivation requires valid OPTION_TOKEN_BEACON_PROXY_INIT_CODE_HASH: {}",
153 e
154 )
155 })?;
156
157 Ok(hypercall_types::OptionTokenDeployment::new(
158 option_registry,
159 beacon_proxy_init_code_hash,
160 ))
161}
162
163pub const MIGRATIONS: EmbeddedMigrations = diesel_migrations::embed_migrations!("../../migrations");
165
166const ORDER_INFOS_ACTIVE_SYMBOL_INDEX_MIGRATION: &str =
167 "2026-06-01-000001_order_infos_active_symbol_index";
168
169struct HypercallMigrations;
175
176impl MigrationSource<Pg> for HypercallMigrations {
177 fn migrations(&self) -> diesel::migration::Result<Vec<Box<dyn Migration<Pg>>>> {
178 MIGRATIONS
179 .migrations()?
180 .into_iter()
181 .map(|migration| {
182 if migration.name().to_string() == ORDER_INFOS_ACTIVE_SYMBOL_INDEX_MIGRATION {
183 Ok(
184 Box::new(SplitStatementMigration::order_infos_active_symbol_index())
185 as Box<dyn Migration<Pg>>,
186 )
187 } else {
188 Ok(migration)
189 }
190 })
191 .collect()
192 }
193}
194
195#[derive(Debug)]
196struct SplitStatementMigration {
197 name: StaticMigrationName,
198 metadata: NonTransactionalMigrationMetadata,
199 up_sql: &'static str,
200 down_sql: &'static str,
201}
202
203impl SplitStatementMigration {
204 fn order_infos_active_symbol_index() -> Self {
205 Self {
206 name: StaticMigrationName(ORDER_INFOS_ACTIVE_SYMBOL_INDEX_MIGRATION),
207 metadata: NonTransactionalMigrationMetadata,
208 up_sql: include_str!(
209 "../../../migrations/2026-06-01-000001_order_infos_active_symbol_index/up.sql"
210 ),
211 down_sql: include_str!(
212 "../../../migrations/2026-06-01-000001_order_infos_active_symbol_index/down.sql"
213 ),
214 }
215 }
216
217 fn execute_statements<DB: Backend>(
218 &self,
219 conn: &mut dyn diesel::connection::BoxableConnection<DB>,
220 sql: &str,
221 ) -> diesel::migration::Result<()> {
222 for statement in sql.split(';').map(str::trim).filter(|s| !s.is_empty()) {
223 conn.batch_execute(statement).map_err(|error| {
224 let message = format!(
225 "failed to run statement in migration {}: {}",
226 self.name, error
227 );
228 Box::new(std::io::Error::new(std::io::ErrorKind::Other, message))
229 as Box<dyn std::error::Error + Send + Sync>
230 })?;
231 }
232 Ok(())
233 }
234}
235
236impl Migration<Pg> for SplitStatementMigration {
237 fn run(
238 &self,
239 conn: &mut dyn diesel::connection::BoxableConnection<Pg>,
240 ) -> diesel::migration::Result<()> {
241 self.execute_statements(conn, self.up_sql)
242 }
243
244 fn revert(
245 &self,
246 conn: &mut dyn diesel::connection::BoxableConnection<Pg>,
247 ) -> diesel::migration::Result<()> {
248 self.execute_statements(conn, self.down_sql)
249 }
250
251 fn metadata(&self) -> &dyn MigrationMetadata {
252 &self.metadata
253 }
254
255 fn name(&self) -> &dyn MigrationName {
256 &self.name
257 }
258}
259
260#[derive(Debug)]
261struct StaticMigrationName(&'static str);
262
263impl MigrationName for StaticMigrationName {
264 fn version(&self) -> MigrationVersion<'_> {
265 self.0.split('_').next().unwrap_or(self.0).into()
266 }
267}
268
269impl fmt::Display for StaticMigrationName {
270 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
271 f.write_str(self.0)
272 }
273}
274
275#[derive(Debug)]
276struct NonTransactionalMigrationMetadata;
277
278impl MigrationMetadata for NonTransactionalMigrationMetadata {
279 fn run_in_transaction(&self) -> bool {
280 false
281 }
282}
283
284#[derive(Debug)]
286struct PgTimeoutCustomizer {
287 statement_timeout_ms: u32,
288 lock_timeout_ms: u32,
289}
290
291impl r2d2::CustomizeConnection<PgConnection, r2d2::Error> for PgTimeoutCustomizer {
292 fn on_acquire(&self, conn: &mut PgConnection) -> std::result::Result<(), r2d2::Error> {
293 conn.batch_execute(&format!(
294 "SET statement_timeout = '{}'; SET lock_timeout = '{}'",
295 self.statement_timeout_ms, self.lock_timeout_ms,
296 ))
297 .map_err(r2d2::Error::QueryError)
298 }
299}
300
301fn redact_database_url(url: &str) -> String {
303 url.find('@')
304 .map(|at| format!("postgres://***@{}", &url[at + 1..]))
305 .unwrap_or_else(|| "postgres://***".to_string())
306}
307
308pub fn build_db_pool(
311 auth: &DbAuthConfig,
312 pool_max: u32,
313 statement_timeout_ms: u32,
314 lock_timeout_ms: u32,
315) -> Result<DbPool> {
316 let manager = DynConnectionManager::new(auth);
317 r2d2::Pool::builder()
318 .max_size(pool_max)
319 .min_idle(Some(0))
320 .connection_timeout(std::time::Duration::from_secs(5))
321 .idle_timeout(Some(std::time::Duration::from_secs(300)))
322 .connection_customizer(Box::new(PgTimeoutCustomizer {
323 statement_timeout_ms,
324 lock_timeout_ms,
325 }))
326 .build(manager)
327 .with_context(|| "Failed to create connection pool")
328}
329
330pub struct DatabaseHandler {
337 pool: Arc<DbPool>,
338}
339
340impl DatabaseHandler {
341 pub fn new(database_url: &str) -> Result<Self> {
343 Self::new_with_pool_size(database_url, 3)
344 }
345
346 pub fn new_with_pool_size(database_url: &str, pool_max: u32) -> Result<Self> {
348 Self::new_with_auth(DbAuthConfig::password(database_url), pool_max)
349 }
350
351 pub fn new_with_auth(auth: DbAuthConfig, pool_max: u32) -> Result<Self> {
353 let handler = Self::build_pool(&auth, pool_max, 30_000, 10_000)?;
354 handler.run_migrations().with_context(|| {
355 format!(
356 "Failed to run migrations for database (auth={:?}): {}",
357 auth,
358 redact_database_url(&auth.current_url())
359 )
360 })?;
361 Ok(handler)
362 }
363
364 pub fn new_with_timeouts(
367 database_url: &str,
368 pool_max: u32,
369 statement_timeout_ms: u32,
370 lock_timeout_ms: u32,
371 ) -> Result<Self> {
372 Self::new_with_timeouts_auth(
373 DbAuthConfig::password(database_url),
374 pool_max,
375 statement_timeout_ms,
376 lock_timeout_ms,
377 )
378 }
379
380 pub fn new_with_timeouts_auth(
382 auth: DbAuthConfig,
383 pool_max: u32,
384 statement_timeout_ms: u32,
385 lock_timeout_ms: u32,
386 ) -> Result<Self> {
387 let redacted = redact_database_url(&auth.current_url());
388 info!(
389 "Initializing DatabaseHandler (custom timeouts: stmt={}ms, lock={}ms): {}",
390 statement_timeout_ms, lock_timeout_ms, redacted
391 );
392 Self::build_pool(&auth, pool_max, statement_timeout_ms, lock_timeout_ms)
393 }
394
395 pub fn new_readonly(database_url: &str, pool_max: u32) -> Result<Self> {
397 Self::new_readonly_auth(DbAuthConfig::password(database_url), pool_max)
398 }
399
400 pub fn new_readonly_auth(auth: DbAuthConfig, pool_max: u32) -> Result<Self> {
402 let redacted = redact_database_url(&auth.current_url());
403 info!(
404 "Initializing readonly DatabaseHandler (migrations skipped): {}",
405 redacted
406 );
407 Self::build_pool(&auth, pool_max, 30_000, 10_000)
408 }
409
410 pub fn with_pool(pool: Arc<DbPool>) -> Result<Self> {
412 info!("Creating DatabaseHandler with existing pool");
413 let handler = Self { pool };
414 handler
415 .run_migrations()
416 .with_context(|| "Failed to run migrations")?;
417 Ok(handler)
418 }
419
420 pub fn with_pool_no_migrations(pool: Arc<DbPool>) -> Self {
422 info!("Creating DatabaseHandler with existing pool (migrations skipped)");
423 Self { pool }
424 }
425
426 pub fn pool(&self) -> Arc<DbPool> {
428 self.pool.clone()
429 }
430
431 pub fn get_pool(&self) -> DbPool {
433 (*self.pool).clone()
434 }
435
436 pub fn run_migrations(&self) -> Result<()> {
438 let mut conn = self.pool.get()?;
439
440 match conn.run_pending_migrations(HypercallMigrations) {
441 Ok(migrations) => {
442 info!(
443 "Database migrations completed successfully: {} migrations applied",
444 migrations.len()
445 );
446 }
447 Err(e) => return Err(anyhow::anyhow!("Migration failed: {}", e)),
448 }
449
450 Self::ensure_enum_values(&mut conn)?;
451 Self::ensure_directive_action_key_enum(&mut conn)?;
452 Self::ensure_directive_outbox_wallet_address(&mut conn)?;
453 Self::ensure_real_liquidation_schema(&mut conn)?;
454
455 Ok(())
456 }
457
458 fn ensure_real_liquidation_schema(conn: &mut PgConnection) -> Result<()> {
459 diesel::sql_query(
460 "ALTER TABLE liquidation_states
461 ADD COLUMN IF NOT EXISTS liquidation_mode TEXT,
462 ADD COLUMN IF NOT EXISTS target_equity NUMERIC,
463 ADD COLUMN IF NOT EXISTS escalation_deadline BIGINT,
464 ADD COLUMN IF NOT EXISTS last_reprice_at BIGINT,
465 ADD COLUMN IF NOT EXISTS partial_order_request_ids JSONB,
466 ADD COLUMN IF NOT EXISTS partial_order_client_ids JSONB,
467 ADD COLUMN IF NOT EXISTS partial_bonus_bps INTEGER,
468 ADD COLUMN IF NOT EXISTS request_id TEXT,
469 ADD COLUMN IF NOT EXISTS tx_hash TEXT,
470 ADD COLUMN IF NOT EXISTS chain_start_time BIGINT,
471 ADD COLUMN IF NOT EXISTS margin_needed NUMERIC,
472 ADD COLUMN IF NOT EXISTS stop_request_id TEXT,
473 ADD COLUMN IF NOT EXISTS stop_tx_hash TEXT,
474 ADD COLUMN IF NOT EXISTS resolved_winner BYTEA,
475 ADD COLUMN IF NOT EXISTS resolved_bonus NUMERIC,
476 ADD COLUMN IF NOT EXISTS resolution_tx_hash TEXT,
477 ADD COLUMN IF NOT EXISTS last_observed_block BIGINT,
478 ADD COLUMN IF NOT EXISTS updated_at_ms BIGINT",
479 )
480 .execute(conn)
481 .map_err(|e| anyhow::anyhow!("Failed to ensure liquidation_states schema: {}", e))?;
482
483 diesel::sql_query(
484 "ALTER TABLE liquidation_history
485 ADD COLUMN IF NOT EXISTS liquidation_mode TEXT,
486 ADD COLUMN IF NOT EXISTS maintenance_margin NUMERIC,
487 ADD COLUMN IF NOT EXISTS request_id TEXT,
488 ADD COLUMN IF NOT EXISTS tx_hash TEXT,
489 ADD COLUMN IF NOT EXISTS margin_needed NUMERIC,
490 ADD COLUMN IF NOT EXISTS winner_address BYTEA,
491 ADD COLUMN IF NOT EXISTS bonus NUMERIC,
492 ADD COLUMN IF NOT EXISTS details JSONB NOT NULL DEFAULT '{}'::jsonb",
493 )
494 .execute(conn)
495 .map_err(|e| anyhow::anyhow!("Failed to ensure liquidation_history schema: {}", e))?;
496
497 diesel::sql_query(
498 "UPDATE liquidation_history
499 SET maintenance_margin = equity - mm_required
500 WHERE maintenance_margin IS NULL",
501 )
502 .execute(conn)
503 .map_err(|e| {
504 anyhow::anyhow!(
505 "Failed to backfill liquidation_history.maintenance_margin: {}",
506 e
507 )
508 })?;
509
510 diesel::sql_query(
511 "ALTER TABLE liquidation_history
512 ALTER COLUMN maintenance_margin SET NOT NULL",
513 )
514 .execute(conn)
515 .map_err(|e| {
516 anyhow::anyhow!(
517 "Failed to enforce liquidation_history.maintenance_margin not-null: {}",
518 e
519 )
520 })?;
521
522 diesel::sql_query(
523 "CREATE UNIQUE INDEX IF NOT EXISTS idx_liquidation_history_replay_guard
524 ON liquidation_history (
525 wallet_address, previous_state, new_state, timestamp,
526 COALESCE(auction_id, ''), COALESCE(request_id, ''), COALESCE(tx_hash, '')
527 )",
528 )
529 .execute(conn)
530 .map_err(|e| anyhow::anyhow!("Failed to ensure liquidation_history replay index: {}", e))?;
531
532 diesel::sql_query(
533 "ALTER TABLE liquidation_auctions
534 ADD COLUMN IF NOT EXISTS target_equity NUMERIC,
535 ADD COLUMN IF NOT EXISTS request_id TEXT,
536 ADD COLUMN IF NOT EXISTS tx_hash TEXT,
537 ADD COLUMN IF NOT EXISTS chain_start_time BIGINT,
538 ADD COLUMN IF NOT EXISTS margin_needed NUMERIC,
539 ADD COLUMN IF NOT EXISTS stop_request_id TEXT,
540 ADD COLUMN IF NOT EXISTS stop_tx_hash TEXT,
541 ADD COLUMN IF NOT EXISTS bonus NUMERIC,
542 ADD COLUMN IF NOT EXISTS last_observed_block BIGINT",
543 )
544 .execute(conn)
545 .map_err(|e| anyhow::anyhow!("Failed to ensure liquidation_auctions schema: {}", e))?;
546
547 diesel::sql_query(
548 "CREATE TABLE IF NOT EXISTS rsm_signer_nonces (
549 signer_address BYTEA PRIMARY KEY,
550 next_nonce BIGINT NOT NULL,
551 last_synced_nonce BIGINT,
552 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
553 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
554 )",
555 )
556 .execute(conn)
557 .map_err(|e| anyhow::anyhow!("Failed to ensure rsm_signer_nonces table: {}", e))?;
558
559 diesel::sql_query(
560 "CREATE TABLE IF NOT EXISTS rsm_signer_requests (
561 request_id TEXT PRIMARY KEY,
562 signer_address BYTEA NOT NULL,
563 account_address BYTEA NOT NULL,
564 action BYTEA NOT NULL,
565 nonce BIGINT NOT NULL,
566 status TEXT NOT NULL,
567 directive BYTEA,
568 signature TEXT,
569 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
570 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
571 )",
572 )
573 .execute(conn)
574 .map_err(|e| anyhow::anyhow!("Failed to ensure rsm_signer_requests table: {}", e))?;
575
576 diesel::sql_query(
577 "ALTER TABLE rsm_signer_requests ADD COLUMN IF NOT EXISTS signer_address BYTEA",
578 )
579 .execute(conn)
580 .map_err(|e| {
581 anyhow::anyhow!(
582 "Failed to ensure rsm_signer_requests signer_address column: {}",
583 e
584 )
585 })?;
586
587 diesel::sql_query("DELETE FROM rsm_signer_requests WHERE signer_address IS NULL")
588 .execute(conn)
589 .map_err(|e| {
590 anyhow::anyhow!(
591 "Failed to remove rsm_signer_requests rows missing signer_address: {}",
592 e
593 )
594 })?;
595
596 diesel::sql_query(
597 "ALTER TABLE rsm_signer_requests ALTER COLUMN signer_address SET NOT NULL",
598 )
599 .execute(conn)
600 .map_err(|e| {
601 anyhow::anyhow!(
602 "Failed to require rsm_signer_requests signer_address: {}",
603 e
604 )
605 })?;
606
607 Ok(())
608 }
609
610 fn build_pool(
615 auth: &DbAuthConfig,
616 pool_max: u32,
617 statement_timeout_ms: u32,
618 lock_timeout_ms: u32,
619 ) -> Result<Self> {
620 let redacted = redact_database_url(&auth.current_url());
621 info!(
622 "Initializing DatabaseHandler (auth={:?}): {}",
623 auth, redacted
624 );
625 let manager = DynConnectionManager::new(auth);
626 let pool = r2d2::Pool::builder()
627 .max_size(pool_max)
628 .min_idle(Some(0))
629 .connection_timeout(std::time::Duration::from_secs(5))
630 .idle_timeout(Some(std::time::Duration::from_secs(300)))
631 .connection_customizer(Box::new(PgTimeoutCustomizer {
632 statement_timeout_ms,
633 lock_timeout_ms,
634 }))
635 .build(manager)
636 .with_context(|| {
637 format!(
638 "Failed to create connection pool for database: {}",
639 redacted
640 )
641 })?;
642
643 Ok(Self {
644 pool: Arc::new(pool),
645 })
646 }
647
648 fn ensure_enum_values(conn: &mut PgConnection) -> Result<()> {
650 let stmts = [
651 "ALTER TYPE engine_command_type ADD VALUE IF NOT EXISTS 'RfqExecute'",
652 "ALTER TYPE engine_event_type ADD VALUE IF NOT EXISTS 'RfqFilled'",
653 "ALTER TYPE engine_command_type ADD VALUE IF NOT EXISTS 'ReplaceOrder' AFTER 'CancelOrder'",
654 "ALTER TYPE engine_command_type ADD VALUE IF NOT EXISTS 'PriceUpdate'",
655 "ALTER TYPE engine_command_type ADD VALUE IF NOT EXISTS 'IvUpdate'",
656 "ALTER TYPE engine_command_type ADD VALUE IF NOT EXISTS 'TierUpdate'",
657 "ALTER TYPE engine_command_type ADD VALUE IF NOT EXISTS 'HypercorePositionUpdate'",
658 "ALTER TYPE engine_command_type ADD VALUE IF NOT EXISTS 'MmpConfigUpdate'",
659 "ALTER TYPE engine_command_type ADD VALUE IF NOT EXISTS 'LiquidationBonusUpdate'",
660 "ALTER TYPE engine_command_type ADD VALUE IF NOT EXISTS 'ApproveAgent'",
661 "ALTER TYPE engine_command_type ADD VALUE IF NOT EXISTS 'RevokeAgent'",
662 "ALTER TYPE engine_command_type ADD VALUE IF NOT EXISTS 'NonceAdvance'",
663 "ALTER TYPE engine_command_type ADD VALUE IF NOT EXISTS 'HypercoreEquityUpdate'",
664 "ALTER TYPE engine_command_type ADD VALUE IF NOT EXISTS 'OptionDepositUpdate'",
665 "ALTER TYPE engine_command_type ADD VALUE IF NOT EXISTS 'OptionWithdrawalUpdate'",
666 "ALTER TYPE engine_command_type ADD VALUE IF NOT EXISTS 'CashWithdrawalUpdate'",
667 "ALTER TYPE engine_command_type ADD VALUE IF NOT EXISTS 'RecordPmVaultDeposit'",
668 "ALTER TYPE engine_command_type ADD VALUE IF NOT EXISTS 'RequestPmVaultWithdrawal'",
669 ];
670
671 for stmt in &stmts {
672 diesel::sql_query(*stmt)
673 .execute(conn)
674 .map_err(|e| anyhow::anyhow!("Failed to ensure enum value: {} - {}", stmt, e))?;
675 }
676
677 info!("Ensured all required Postgres enum values exist");
678 Ok(())
679 }
680
681 fn ensure_directive_action_key_enum(conn: &mut PgConnection) -> Result<()> {
684 diesel::sql_query(
685 r#"
686 DO $$
687 BEGIN
688 IF NOT EXISTS (
689 SELECT 1
690 FROM pg_type
691 WHERE typname = 'directive_action_key'
692 ) THEN
693 CREATE TYPE directive_action_key AS ENUM (
694 'hl_limit_order',
695 'hl_cancel_by_oid',
696 'hl_cancel_by_cloid',
697 'hc_update_api_wallet',
698 'hl_send_asset',
699 'hc_transfer_option',
700 'rsm_hl_limit_order',
701 'rsm_hl_cancel_by_oid',
702 'rsm_hl_cancel_by_cloid',
703 'rsm_hl_send_asset',
704 'system_credit_token',
705 'system_credit_option',
706 'system_start_liquidation',
707 'system_stop_liquidation',
708 'system_withdraw_token'
709 );
710 END IF;
711 END $$;
712 "#,
713 )
714 .execute(conn)
715 .context("Failed to ensure directive_action_key enum type exists")?;
716
717 let values = [
718 "hl_limit_order",
719 "hl_cancel_by_oid",
720 "hl_cancel_by_cloid",
721 "hc_update_api_wallet",
722 "hl_send_asset",
723 "hc_transfer_option",
724 "rsm_hl_limit_order",
725 "rsm_hl_cancel_by_oid",
726 "rsm_hl_cancel_by_cloid",
727 "rsm_hl_send_asset",
728 "system_credit_token",
729 "system_credit_option",
730 "system_start_liquidation",
731 "system_stop_liquidation",
732 "system_withdraw_token",
733 ];
734 for value in values {
735 diesel::sql_query(format!(
736 "ALTER TYPE directive_action_key ADD VALUE IF NOT EXISTS '{}'",
737 value
738 ))
739 .execute(conn)
740 .with_context(|| format!("Failed to ensure directive_action_key enum value {value}"))?;
741 }
742
743 diesel::sql_query(
744 r#"
745 ALTER TABLE directive_outbox
746 ALTER COLUMN action_key TYPE directive_action_key
747 USING action_key::text::directive_action_key
748 "#,
749 )
750 .execute(conn)
751 .context("Failed to ensure directive_outbox.action_key uses directive_action_key")?;
752
753 info!("Ensured directive_action_key enum and directive_outbox schema exist");
754 Ok(())
755 }
756
757 fn ensure_directive_outbox_wallet_address(conn: &mut PgConnection) -> Result<()> {
758 diesel::sql_query(
759 r#"
760 ALTER TABLE directive_outbox
761 ADD COLUMN IF NOT EXISTS wallet_address BYTEA
762 "#,
763 )
764 .execute(conn)
765 .context("Failed to ensure directive_outbox.wallet_address column exists")?;
766
767 diesel::sql_query(
768 r#"
769 UPDATE directive_outbox
770 SET wallet_address = account_address
771 WHERE wallet_address IS NULL
772 "#,
773 )
774 .execute(conn)
775 .context("Failed to backfill directive_outbox.wallet_address")?;
776
777 diesel::sql_query(
778 r#"
779 ALTER TABLE directive_outbox
780 ALTER COLUMN wallet_address SET NOT NULL
781 "#,
782 )
783 .execute(conn)
784 .context("Failed to ensure directive_outbox.wallet_address is NOT NULL")?;
785
786 diesel::sql_query(
787 r#"
788 CREATE INDEX IF NOT EXISTS idx_directive_outbox_withdrawal_wallet_created
789 ON directive_outbox (wallet_address, created_ts_ms DESC)
790 "#,
791 )
792 .execute(conn)
793 .context("Failed to ensure directive_outbox wallet history index")?;
794
795 info!("Ensured directive_outbox.wallet_address schema exists");
796 Ok(())
797 }
798
799 pub(crate) fn order_update_status_to_db(
801 status: hypercall_types::OrderUpdateStatus,
802 ) -> &'static str {
803 match status {
804 hypercall_types::OrderUpdateStatus::Acked => "ACKED",
805 hypercall_types::OrderUpdateStatus::Open => "OPEN",
806 hypercall_types::OrderUpdateStatus::PartiallyFilled => "PARTIALLY_FILLED",
807 hypercall_types::OrderUpdateStatus::Filled => "FILLED",
808 hypercall_types::OrderUpdateStatus::Canceled => "CANCELED",
809 hypercall_types::OrderUpdateStatus::Rejected => "REJECTED",
810 }
811 }
812
813 pub(crate) fn observe_diesel_option_token_violation(err: &diesel::result::Error) {
815 if let diesel::result::Error::DatabaseError(
816 diesel::result::DatabaseErrorKind::UniqueViolation,
817 ref info,
818 ) = err
819 {
820 if info
821 .constraint_name()
822 .is_some_and(|c| c.contains("option_token_address"))
823 {
824 tracing::warn!(
825 "Option token address unique constraint violation (likely re-listing): {}",
826 info.message()
827 );
828 metrics::counter!("ht_diesel_option_token_violation_total").increment(1);
829 }
830 }
831 }
832
833 pub fn update_snapshot_boundary_sync(&self, last_command_id: i64) -> Result<()> {
837 let mut conn = self.pool.get()?;
838 diesel::sql_query(
839 "INSERT INTO engine_snapshot_boundary (id, last_command_id, updated_at) \
840 VALUES (1, $1, now()) \
841 ON CONFLICT (id) DO UPDATE SET last_command_id = $1, updated_at = now()",
842 )
843 .bind::<diesel::sql_types::BigInt, _>(last_command_id)
844 .execute(&mut conn)?;
845 Ok(())
846 }
847
848 pub fn directive_outbox_exists_sync(&self, directive_id: &str) -> Result<bool> {
852 use diesel::sql_types::Text;
853
854 #[derive(diesel::QueryableByName)]
855 struct ExistsRow {
856 #[diesel(sql_type = diesel::sql_types::Bool)]
857 exists: bool,
858 }
859
860 let mut conn = self.pool.get()?;
861 let row = diesel::sql_query(
862 "SELECT EXISTS (
863 SELECT 1 FROM directive_outbox WHERE directive_id = $1
864 ) AS exists",
865 )
866 .bind::<Text, _>(directive_id)
867 .get_result::<ExistsRow>(&mut conn)?;
868 Ok(row.exists)
869 }
870
871 pub async fn get_max_rsm_deposit_credit_observed_block(&self) -> Result<Option<u64>> {
873 use diesel::sql_types::{BigInt, Nullable};
874
875 #[derive(diesel::QueryableByName)]
876 struct MaxBlockRow {
877 #[diesel(sql_type = Nullable<BigInt>)]
878 max_block: Option<i64>,
879 }
880
881 let mut conn = self.pool.get()?;
882 let row = diesel::sql_query(
883 "SELECT COALESCE(
884 MIN(observed_block) FILTER (WHERE status <> 'submitted'),
885 MAX(observed_block)
886 ) AS max_block
887 FROM rsm_deposit_credits",
888 )
889 .get_result::<MaxBlockRow>(&mut conn)?;
890
891 row.max_block
892 .map(|value| {
893 u64::try_from(value).map_err(|_| {
894 anyhow::anyhow!(
895 "negative rsm_deposit_credits observed_block {} persisted in database",
896 value
897 )
898 })
899 })
900 .transpose()
901 }
902
903 pub async fn ensure_observed_deposit_account(
905 &self,
906 account: &hypercall_types::WalletAddress,
907 ) -> Result<()> {
908 use diesel::sql_types::Binary;
909
910 let mut conn = self.pool.get()?;
911 diesel::sql_query(
912 "INSERT INTO user_tiers (wallet_address, tier, margin_mode, version)
913 VALUES ($1, 'tier2', 'standard', 1)
914 ON CONFLICT (wallet_address) DO NOTHING",
915 )
916 .bind::<Binary, _>(account)
917 .execute(&mut conn)?;
918 Ok(())
919 }
920
921 pub async fn claim_rsm_deposit_credit(
923 &self,
924 input: &hypercall_db::RsmDepositCreditClaimInput,
925 ) -> Result<hypercall_db::RsmDepositCreditClaimRecord> {
926 use diesel::sql_types::{BigInt, Binary, Text};
927
928 #[derive(diesel::QueryableByName)]
929 struct ClaimRow {
930 #[diesel(sql_type = Text)]
931 tx_hash: String,
932 #[diesel(sql_type = BigInt)]
933 log_index: i64,
934 #[diesel(sql_type = BigInt)]
935 observed_block: i64,
936 #[diesel(sql_type = Binary)]
937 account: hypercall_types::WalletAddress,
938 #[diesel(sql_type = Binary)]
939 token: hypercall_types::WalletAddress,
940 #[diesel(sql_type = Text)]
941 amount_wei: String,
942 #[diesel(sql_type = Text)]
943 credit_kind: String,
944 #[diesel(sql_type = Text)]
945 request_id: String,
946 #[diesel(sql_type = Text)]
947 status: String,
948 }
949
950 let mut conn = self.pool.get()?;
951 let row = diesel::sql_query(
952 "WITH inserted AS (
953 INSERT INTO rsm_deposit_credits
954 (tx_hash, log_index, observed_block, account, token, amount_wei, credit_kind, request_id)
955 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
956 ON CONFLICT (tx_hash, log_index) DO NOTHING
957 RETURNING tx_hash, log_index, observed_block, account, token, amount_wei, credit_kind, request_id, status
958 )
959 SELECT tx_hash, log_index, observed_block, account, token, amount_wei, credit_kind, request_id, status
960 FROM inserted
961 UNION ALL
962 SELECT tx_hash, log_index, observed_block, account, token, amount_wei, credit_kind, request_id, status
963 FROM rsm_deposit_credits
964 WHERE tx_hash = $1 AND log_index = $2
965 LIMIT 1",
966 )
967 .bind::<Text, _>(&input.tx_hash)
968 .bind::<BigInt, _>(input.log_index)
969 .bind::<BigInt, _>(input.observed_block)
970 .bind::<Binary, _>(&input.account)
971 .bind::<Binary, _>(&input.token)
972 .bind::<Text, _>(&input.amount_wei)
973 .bind::<Text, _>(&input.credit_kind)
974 .bind::<Text, _>(&input.request_id)
975 .get_result::<ClaimRow>(&mut conn)?;
976
977 Ok(hypercall_db::RsmDepositCreditClaimRecord {
978 tx_hash: row.tx_hash,
979 log_index: row.log_index,
980 observed_block: row.observed_block,
981 account: row.account,
982 token: row.token,
983 amount_wei: row.amount_wei,
984 credit_kind: row.credit_kind,
985 request_id: row.request_id,
986 status: row.status,
987 })
988 }
989
990 pub async fn mark_rsm_deposit_credit_submitted(&self, request_id: &str) -> Result<()> {
992 use diesel::sql_types::Text;
993
994 let mut conn = self.pool.get()?;
995 let updated = diesel::sql_query(
996 "UPDATE rsm_deposit_credits
997 SET status = 'submitted', error = NULL, updated_at = now()
998 WHERE request_id = $1",
999 )
1000 .bind::<Text, _>(request_id)
1001 .execute(&mut conn)?;
1002 if updated != 1 {
1003 anyhow::bail!(
1004 "expected 1 rsm_deposit_credits row updated for request_id {}, got {}",
1005 request_id,
1006 updated
1007 );
1008 }
1009 Ok(())
1010 }
1011
1012 pub async fn mark_rsm_deposit_credit_failed(
1014 &self,
1015 request_id: &str,
1016 error: &str,
1017 ) -> Result<()> {
1018 use diesel::sql_types::Text;
1019
1020 let mut conn = self.pool.get()?;
1021 let updated = diesel::sql_query(
1022 "UPDATE rsm_deposit_credits
1023 SET status = 'failed', error = $2, updated_at = now()
1024 WHERE request_id = $1",
1025 )
1026 .bind::<Text, _>(request_id)
1027 .bind::<Text, _>(error)
1028 .execute(&mut conn)?;
1029 if updated != 1 {
1030 anyhow::bail!(
1031 "expected 1 rsm_deposit_credits row failed for request_id {}, got {}",
1032 request_id,
1033 updated
1034 );
1035 }
1036 Ok(())
1037 }
1038
1039 pub async fn pending_rsm_usdc_deposit_for_amount(
1041 &self,
1042 amount_wei: &str,
1043 ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>> {
1044 use diesel::sql_types::{BigInt, Binary, Text};
1045
1046 #[derive(diesel::QueryableByName)]
1047 struct Row {
1048 #[diesel(sql_type = Text)]
1049 request_id: String,
1050 #[diesel(sql_type = Binary)]
1051 account: Vec<u8>,
1052 #[diesel(sql_type = Binary)]
1053 token: Vec<u8>,
1054 #[diesel(sql_type = Text)]
1055 tx_hash: String,
1056 #[diesel(sql_type = BigInt)]
1057 log_index: i64,
1058 #[diesel(sql_type = Text)]
1059 amount_wei: String,
1060 }
1061
1062 let mut conn = self.pool.get()?;
1063 let rows = diesel::sql_query(
1064 "SELECT request_id, account, token, tx_hash, log_index, amount_wei
1065 FROM rsm_deposit_credits
1066 WHERE credit_kind = 'usdc'
1067 AND status = 'pending'
1068 AND amount_wei = $1
1069 ORDER BY observed_block ASC, log_index ASC
1070 LIMIT 2",
1071 )
1072 .bind::<Text, _>(amount_wei)
1073 .load::<Row>(&mut conn)?;
1074
1075 if rows.len() > 1 {
1076 anyhow::bail!(
1077 "ambiguous pending Exchange.UsdcDeposit attribution for amount_wei {}",
1078 amount_wei
1079 );
1080 }
1081
1082 rows.into_iter()
1083 .next()
1084 .map(|row| {
1085 let account = wallet_from_bytes(
1086 row.account.as_slice(),
1087 &format!("pending USDC deposit {}:{}", row.tx_hash, row.log_index),
1088 )?;
1089 let token = wallet_from_bytes(
1090 row.token.as_slice(),
1091 &format!(
1092 "pending USDC deposit token {}:{}",
1093 row.tx_hash, row.log_index
1094 ),
1095 )?;
1096 Ok(hypercall_db::RsmUsdcDepositMatch {
1097 request_id: row.request_id,
1098 account,
1099 tx_hash: row.tx_hash,
1100 log_index: row.log_index,
1101 amount_wei: row.amount_wei,
1102 token,
1103 })
1104 })
1105 .transpose()
1106 }
1107
1108 pub async fn pending_rsm_usdc_deposit_for_evm_tx_hash(
1110 &self,
1111 evm_tx_hash: &str,
1112 amount_wei: &str,
1113 ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>> {
1114 use diesel::sql_types::{BigInt, Binary, Text};
1115
1116 #[derive(diesel::QueryableByName)]
1117 struct Row {
1118 #[diesel(sql_type = Text)]
1119 request_id: String,
1120 #[diesel(sql_type = Binary)]
1121 account: Vec<u8>,
1122 #[diesel(sql_type = Binary)]
1123 token: Vec<u8>,
1124 #[diesel(sql_type = Text)]
1125 tx_hash: String,
1126 #[diesel(sql_type = BigInt)]
1127 log_index: i64,
1128 #[diesel(sql_type = Text)]
1129 amount_wei: String,
1130 }
1131
1132 let mut conn = self.pool.get()?;
1133 let rows = diesel::sql_query(
1134 "SELECT request_id, account, token, tx_hash, log_index, amount_wei
1135 FROM rsm_deposit_credits
1136 WHERE credit_kind = 'usdc'
1137 AND status = 'pending'
1138 AND tx_hash = $1
1139 ORDER BY observed_block ASC, log_index ASC",
1140 )
1141 .bind::<Text, _>(evm_tx_hash)
1142 .load::<Row>(&mut conn)?;
1143
1144 if rows.is_empty() {
1145 return Ok(None);
1146 }
1147
1148 let amount_matched_rows: Vec<_> = rows
1149 .into_iter()
1150 .filter(|row| row.amount_wei == amount_wei)
1151 .collect();
1152
1153 if amount_matched_rows.is_empty() {
1154 anyhow::bail!(
1155 "CoreWriter evm_tx_hash {} matched pending Exchange.UsdcDeposit rows, but none had amount_wei {}",
1156 evm_tx_hash,
1157 amount_wei
1158 );
1159 }
1160
1161 if amount_matched_rows.len() > 1 {
1162 anyhow::bail!(
1163 "ambiguous pending Exchange.UsdcDeposit attribution for evm_tx_hash {} amount_wei {}",
1164 evm_tx_hash,
1165 amount_wei
1166 );
1167 }
1168
1169 let row = amount_matched_rows
1170 .into_iter()
1171 .next()
1172 .expect("amount_matched_rows is non-empty");
1173 let account = wallet_from_bytes(
1174 row.account.as_slice(),
1175 &format!("pending USDC deposit {}:{}", row.tx_hash, row.log_index),
1176 )?;
1177 let token = wallet_from_bytes(
1178 row.token.as_slice(),
1179 &format!(
1180 "pending USDC deposit token {}:{}",
1181 row.tx_hash, row.log_index
1182 ),
1183 )?;
1184 Ok(Some(hypercall_db::RsmUsdcDepositMatch {
1185 request_id: row.request_id,
1186 account,
1187 tx_hash: row.tx_hash,
1188 log_index: row.log_index,
1189 amount_wei: row.amount_wei,
1190 token,
1191 }))
1192 }
1193
1194 pub async fn pm_liquidity_deposit_for_evm_tx_hash(
1196 &self,
1197 evm_tx_hash: &str,
1198 amount_wei: &str,
1199 ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>> {
1200 use diesel::sql_types::{BigInt, Binary, Text};
1201
1202 #[derive(diesel::QueryableByName)]
1203 struct Row {
1204 #[diesel(sql_type = Text)]
1205 request_id: String,
1206 #[diesel(sql_type = Binary)]
1207 account: Vec<u8>,
1208 #[diesel(sql_type = Binary)]
1209 token: Vec<u8>,
1210 #[diesel(sql_type = Text)]
1211 tx_hash: String,
1212 #[diesel(sql_type = BigInt)]
1213 log_index: i64,
1214 #[diesel(sql_type = Text)]
1215 amount_wei: String,
1216 }
1217
1218 let mut conn = self.pool.get()?;
1219 let rows = diesel::sql_query(
1220 "SELECT request_id, account, token, tx_hash, log_index, amount_wei
1221 FROM rsm_deposit_credits
1222 WHERE credit_kind = 'pm_liquidity'
1223 AND tx_hash = $1
1224 AND matched_hypercore_event_hash IS NULL
1225 ORDER BY observed_block ASC, log_index ASC",
1226 )
1227 .bind::<Text, _>(evm_tx_hash)
1228 .load::<Row>(&mut conn)?;
1229
1230 if rows.is_empty() {
1231 return Ok(None);
1232 }
1233
1234 let amount_matched_rows: Vec<_> = rows
1235 .into_iter()
1236 .filter(|row| row.amount_wei == amount_wei)
1237 .collect();
1238
1239 if amount_matched_rows.is_empty() {
1240 anyhow::bail!(
1241 "CoreWriter evm_tx_hash {} matched PM liquidity deposit rows, but none had amount_wei {}",
1242 evm_tx_hash,
1243 amount_wei
1244 );
1245 }
1246
1247 if amount_matched_rows.len() > 1 {
1248 anyhow::bail!(
1249 "ambiguous PM liquidity deposit attribution for evm_tx_hash {} amount_wei {}",
1250 evm_tx_hash,
1251 amount_wei
1252 );
1253 }
1254
1255 let row = amount_matched_rows
1256 .into_iter()
1257 .next()
1258 .expect("amount_matched_rows is non-empty");
1259 rsm_deposit_match_from_row(
1260 row.request_id,
1261 row.account,
1262 row.token,
1263 row.tx_hash,
1264 row.log_index,
1265 row.amount_wei,
1266 "PM liquidity deposit",
1267 )
1268 .map(Some)
1269 }
1270
1271 pub async fn pm_liquidity_deposit_for_amount(
1273 &self,
1274 amount_wei: &str,
1275 ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>> {
1276 use diesel::sql_types::{BigInt, Binary, Text};
1277
1278 #[derive(diesel::QueryableByName)]
1279 struct Row {
1280 #[diesel(sql_type = Text)]
1281 request_id: String,
1282 #[diesel(sql_type = Binary)]
1283 account: Vec<u8>,
1284 #[diesel(sql_type = Binary)]
1285 token: Vec<u8>,
1286 #[diesel(sql_type = Text)]
1287 tx_hash: String,
1288 #[diesel(sql_type = BigInt)]
1289 log_index: i64,
1290 #[diesel(sql_type = Text)]
1291 amount_wei: String,
1292 }
1293
1294 let mut conn = self.pool.get()?;
1295 let rows = diesel::sql_query(
1296 "SELECT request_id, account, token, tx_hash, log_index, amount_wei
1297 FROM rsm_deposit_credits
1298 WHERE credit_kind = 'pm_liquidity'
1299 AND amount_wei = $1
1300 AND matched_hypercore_event_hash IS NULL
1301 ORDER BY observed_block ASC, log_index ASC
1302 LIMIT 2",
1303 )
1304 .bind::<Text, _>(amount_wei)
1305 .load::<Row>(&mut conn)?;
1306
1307 if rows.len() > 1 {
1308 anyhow::bail!(
1309 "ambiguous PM liquidity deposit attribution for amount_wei {}",
1310 amount_wei
1311 );
1312 }
1313
1314 rows.into_iter()
1315 .next()
1316 .map(|row| {
1317 rsm_deposit_match_from_row(
1318 row.request_id,
1319 row.account,
1320 row.token,
1321 row.tx_hash,
1322 row.log_index,
1323 row.amount_wei,
1324 "PM liquidity deposit",
1325 )
1326 })
1327 .transpose()
1328 }
1329
1330 pub async fn mark_pm_liquidity_deposit_hypercore_matched(
1332 &self,
1333 request_id: &str,
1334 event_hash: &str,
1335 ) -> Result<()> {
1336 use diesel::sql_types::Text;
1337
1338 let mut conn = self.pool.get()?;
1339 let updated = diesel::sql_query(
1340 "UPDATE rsm_deposit_credits
1341 SET matched_hypercore_event_hash = $2, updated_at = now()
1342 WHERE request_id = $1
1343 AND credit_kind = 'pm_liquidity'
1344 AND (matched_hypercore_event_hash IS NULL OR matched_hypercore_event_hash = $2)",
1345 )
1346 .bind::<Text, _>(request_id)
1347 .bind::<Text, _>(event_hash)
1348 .execute(&mut conn)?;
1349 if updated != 1 {
1350 anyhow::bail!(
1351 "expected 1 PM liquidity deposit matched for request_id {}, got {}",
1352 request_id,
1353 updated
1354 );
1355 }
1356 Ok(())
1357 }
1358
1359 pub async fn pending_rsm_usdc_deposit_for_credited_hypercore_event(
1361 &self,
1362 event_hash: &str,
1363 amount_wei: &str,
1364 ) -> Result<Option<hypercall_db::RsmUsdcDepositMatch>> {
1365 use diesel::sql_types::{BigInt, Binary, Text};
1366
1367 #[derive(diesel::QueryableByName)]
1368 struct Row {
1369 #[diesel(sql_type = Text)]
1370 request_id: String,
1371 #[diesel(sql_type = Binary)]
1372 account: Vec<u8>,
1373 #[diesel(sql_type = Binary)]
1374 token: Vec<u8>,
1375 #[diesel(sql_type = Text)]
1376 tx_hash: String,
1377 #[diesel(sql_type = BigInt)]
1378 log_index: i64,
1379 #[diesel(sql_type = Text)]
1380 amount_wei: String,
1381 }
1382
1383 let mut conn = self.pool.get()?;
1384 let rows = diesel::sql_query(
1385 "SELECT r.request_id, r.account, r.token, r.tx_hash, r.log_index, r.amount_wei
1386 FROM hypercore_cash_ledger_events h
1387 JOIN rsm_deposit_credits r
1388 ON r.account = h.wallet
1389 AND r.credit_kind = 'usdc'
1390 AND r.status = 'pending'
1391 AND r.amount_wei = $2
1392 WHERE h.event_hash = $1
1393 AND h.delta_type = 'pool_transfer'
1394 AND h.ledger_event_id IS NOT NULL
1395 ORDER BY r.observed_block ASC, r.log_index ASC
1396 LIMIT 2",
1397 )
1398 .bind::<Text, _>(event_hash)
1399 .bind::<Text, _>(amount_wei)
1400 .load::<Row>(&mut conn)?;
1401
1402 if rows.len() > 1 {
1403 anyhow::bail!(
1404 "ambiguous pending Exchange.UsdcDeposit replay attribution for HyperCore event {} amount_wei {}",
1405 event_hash,
1406 amount_wei
1407 );
1408 }
1409
1410 rows.into_iter()
1411 .next()
1412 .map(|row| {
1413 let account = wallet_from_bytes(
1414 row.account.as_slice(),
1415 &format!(
1416 "pending replay USDC deposit {}:{}",
1417 row.tx_hash, row.log_index
1418 ),
1419 )?;
1420 let token = wallet_from_bytes(
1421 row.token.as_slice(),
1422 &format!(
1423 "pending replay USDC deposit token {}:{}",
1424 row.tx_hash, row.log_index
1425 ),
1426 )?;
1427 Ok(hypercall_db::RsmUsdcDepositMatch {
1428 request_id: row.request_id,
1429 account,
1430 tx_hash: row.tx_hash,
1431 log_index: row.log_index,
1432 amount_wei: row.amount_wei,
1433 token,
1434 })
1435 })
1436 .transpose()
1437 }
1438
1439 pub async fn credited_wallet_for_hypercore_cash_event(
1441 &self,
1442 event_hash: &str,
1443 ) -> Result<Option<hypercall_types::WalletAddress>> {
1444 use diesel::sql_types::{Binary, Text};
1445
1446 #[derive(diesel::QueryableByName)]
1447 struct Row {
1448 #[diesel(sql_type = Binary)]
1449 wallet: Vec<u8>,
1450 }
1451
1452 let mut conn = self.pool.get()?;
1453 let rows = diesel::sql_query(
1454 "SELECT wallet
1455 FROM hypercore_cash_ledger_events
1456 WHERE event_hash = $1
1457 AND delta_type = 'pool_transfer'
1458 AND ledger_event_id IS NOT NULL
1459 ORDER BY id ASC
1460 LIMIT 2",
1461 )
1462 .bind::<Text, _>(event_hash)
1463 .load::<Row>(&mut conn)?;
1464
1465 if rows.len() > 1 {
1466 anyhow::bail!(
1467 "duplicate credited wallet rows for hypercore cash event hash {}",
1468 event_hash
1469 );
1470 }
1471
1472 rows.into_iter()
1473 .next()
1474 .map(|row| wallet_from_bytes(row.wallet.as_slice(), "credited hypercore cash event"))
1475 .transpose()
1476 }
1477
1478 pub async fn non_crediting_hypercore_cash_event(
1480 &self,
1481 event_hash: &str,
1482 amount_usdc: rust_decimal::Decimal,
1483 ) -> Result<bool> {
1484 use diesel::sql_types::{Bool, Numeric, Text};
1485
1486 #[derive(diesel::QueryableByName)]
1487 struct Row {
1488 #[diesel(sql_type = Bool)]
1489 exists: bool,
1490 }
1491
1492 let mut conn = self.pool.get()?;
1493 let row = diesel::sql_query(
1494 "SELECT EXISTS (
1495 SELECT 1
1496 FROM hypercore_cash_ledger_events
1497 WHERE event_hash = $1
1498 AND delta_type = 'pool_transfer'
1499 AND amount_usdc = $2
1500 AND status = 'submitted'
1501 AND ledger_event_id IS NULL
1502 ) AS exists",
1503 )
1504 .bind::<Text, _>(event_hash)
1505 .bind::<Numeric, _>(amount_usdc)
1506 .get_result::<Row>(&mut conn)?;
1507 Ok(row.exists)
1508 }
1509
1510 pub async fn list_recent_cash_deposit_monitoring_rows(
1512 &self,
1513 limit: i64,
1514 offset: i64,
1515 ) -> Result<Vec<hypercall_db::DepositMonitoringRow>> {
1516 use diesel::sql_types::{BigInt, Binary, Nullable, Text};
1517
1518 #[derive(diesel::QueryableByName)]
1519 struct Row {
1520 #[diesel(sql_type = Text)]
1521 source: String,
1522 #[diesel(sql_type = Text)]
1523 status: String,
1524 #[diesel(sql_type = Text)]
1525 correlation_status: String,
1526 #[diesel(sql_type = Binary)]
1527 wallet: Vec<u8>,
1528 #[diesel(sql_type = Text)]
1529 amount_usdc: String,
1530 #[diesel(sql_type = Text)]
1531 event_hash: String,
1532 #[diesel(sql_type = Nullable<Text>)]
1533 tx_hash: Option<String>,
1534 #[diesel(sql_type = Nullable<Text>)]
1535 request_id: Option<String>,
1536 #[diesel(sql_type = Nullable<BigInt>)]
1537 observed_block: Option<i64>,
1538 #[diesel(sql_type = Nullable<BigInt>)]
1539 log_index: Option<i64>,
1540 #[diesel(sql_type = Nullable<BigInt>)]
1541 ledger_event_id: Option<i64>,
1542 #[diesel(sql_type = Text)]
1543 created_at: String,
1544 #[diesel(sql_type = Text)]
1545 updated_at: String,
1546 }
1547
1548 let mut conn = self.pool.get()?;
1549 let rows = diesel::sql_query(
1550 "SELECT source, status, correlation_status, wallet, amount_usdc, event_hash, tx_hash, request_id,
1551 observed_block, log_index, ledger_event_id, created_at, updated_at
1552 FROM (
1553 SELECT
1554 'hypercore'::text AS source,
1555 status,
1556 'cash_ledger_event'::text AS correlation_status,
1557 wallet,
1558 amount_usdc::text AS amount_usdc,
1559 event_hash,
1560 NULL::text AS tx_hash,
1561 NULL::text AS request_id,
1562 NULL::bigint AS observed_block,
1563 NULL::bigint AS log_index,
1564 ledger_event_id,
1565 created_at::text AS created_at,
1566 updated_at::text AS updated_at
1567 FROM hypercore_cash_ledger_events
1568 WHERE delta_type IN ('deposit', 'pool_transfer')
1569 UNION ALL
1570 SELECT
1571 CASE WHEN credit_kind = 'usdc' THEN 'exchange_usdc' ELSE 'rsm_option' END AS source,
1572 status,
1573 CASE
1574 WHEN credit_kind = 'usdc' AND status = 'submitted' THEN 'writer_action_correlated'
1575 WHEN credit_kind = 'usdc' AND status = 'pending' THEN 'pending_writer_action'
1576 WHEN credit_kind = 'usdc' AND status = 'failed' THEN 'failed'
1577 ELSE 'not_applicable'
1578 END AS correlation_status,
1579 account AS wallet,
1580 CASE
1581 WHEN credit_kind = 'usdc'
1582 THEN (amount_wei::numeric / 1000000)::text
1583 ELSE amount_wei
1584 END AS amount_usdc,
1585 tx_hash AS event_hash,
1586 tx_hash,
1587 request_id,
1588 observed_block,
1589 log_index,
1590 NULL::bigint AS ledger_event_id,
1591 created_at::text AS created_at,
1592 updated_at::text AS updated_at
1593 FROM rsm_deposit_credits
1594 ) deposits
1595 ORDER BY created_at DESC, event_hash DESC
1596 LIMIT $1 OFFSET $2",
1597 )
1598 .bind::<BigInt, _>(limit)
1599 .bind::<BigInt, _>(offset)
1600 .load::<Row>(&mut conn)?;
1601
1602 rows.into_iter()
1603 .map(|row| {
1604 Ok(hypercall_db::DepositMonitoringRow {
1605 source: row.source,
1606 status: row.status,
1607 correlation_status: row.correlation_status,
1608 wallet: wallet_from_bytes(row.wallet.as_slice(), "deposit monitoring wallet")?,
1609 amount_usdc: row.amount_usdc,
1610 event_hash: row.event_hash,
1611 tx_hash: row.tx_hash,
1612 request_id: row.request_id,
1613 observed_block: row.observed_block,
1614 log_index: row.log_index,
1615 ledger_event_id: row.ledger_event_id,
1616 created_at: row.created_at,
1617 updated_at: row.updated_at,
1618 })
1619 })
1620 .collect()
1621 }
1622
1623 pub async fn apply_hypercore_cash_deposit(
1625 &self,
1626 input: &hypercall_db::HypercoreCashLedgerApply,
1627 ) -> Result<hypercall_db::HypercoreCashLedgerApplyResult> {
1628 use diesel::sql_types::{BigInt, Binary, Nullable, Numeric, Text};
1629 use diesel::Connection;
1630 use diesel::OptionalExtension;
1631 use rust_decimal::Decimal;
1632
1633 if input.amount_usdc <= Decimal::ZERO {
1634 anyhow::bail!(
1635 "hypercore cash deposit amount must be positive for {} event_hash={}",
1636 input.wallet,
1637 input.event_hash
1638 );
1639 }
1640
1641 #[derive(diesel::QueryableByName)]
1642 struct InsertedEventRow {
1643 #[diesel(sql_type = BigInt)]
1644 id: i64,
1645 }
1646
1647 #[derive(diesel::QueryableByName)]
1648 struct LedgerEventRow {
1649 #[diesel(sql_type = BigInt)]
1650 id: i64,
1651 }
1652
1653 #[derive(diesel::QueryableByName)]
1654 struct ExistingEventRow {
1655 #[diesel(sql_type = BigInt)]
1656 id: i64,
1657 #[diesel(sql_type = Nullable<BigInt>)]
1658 ledger_event_id: Option<i64>,
1659 #[diesel(sql_type = Nullable<Numeric>)]
1660 balance_after: Option<Decimal>,
1661 }
1662
1663 let mut conn = self.pool.get()?;
1664 let row = conn.transaction::<hypercall_db::HypercoreCashLedgerApplyResult, diesel::result::Error, _>(|conn| {
1665 let inserted = diesel::sql_query(
1666 "INSERT INTO hypercore_cash_ledger_events
1667 (wallet, event_hash, event_time_ms, delta_type, amount_usdc)
1668 VALUES ($1, $2, $3, 'pool_transfer', $4)
1669 ON CONFLICT (wallet, event_hash, delta_type) DO NOTHING
1670 RETURNING id",
1671 )
1672 .bind::<Binary, _>(&input.wallet)
1673 .bind::<Text, _>(&input.event_hash)
1674 .bind::<BigInt, _>(input.event_time_ms)
1675 .bind::<Numeric, _>(input.amount_usdc)
1676 .get_result::<InsertedEventRow>(conn)
1677 .optional()?;
1678
1679 let Some(inserted) = inserted else {
1680 let existing = diesel::sql_query(
1681 "SELECT id, ledger_event_id, balance_after
1682 FROM hypercore_cash_ledger_events
1683 WHERE wallet = $1 AND event_hash = $2 AND delta_type = 'pool_transfer'
1684 FOR UPDATE",
1685 )
1686 .bind::<Binary, _>(&input.wallet)
1687 .bind::<Text, _>(&input.event_hash)
1688 .get_result::<ExistingEventRow>(conn)?;
1689
1690 if let Some(ledger_event_id) = existing.ledger_event_id {
1691 return Ok(hypercall_db::HypercoreCashLedgerApplyResult {
1692 applied: false,
1693 balance_after: existing.balance_after,
1694 ledger_event_id: Some(
1695 u64::try_from(ledger_event_id).unwrap_or_else(|_| {
1696 panic!(
1697 "STATE_CORRUPTION: negative ledger_event_id {} for hypercore deposit {}",
1698 ledger_event_id, input.event_hash
1699 )
1700 }),
1701 ),
1702 });
1703 }
1704
1705 let ledger = diesel::sql_query(
1706 "INSERT INTO ledger_events (wallet, event_ts_ms, delta, event_type)
1707 VALUES ($1, $2, $3, 'deposit')
1708 RETURNING id",
1709 )
1710 .bind::<Binary, _>(&input.wallet)
1711 .bind::<BigInt, _>(input.event_time_ms)
1712 .bind::<Numeric, _>(input.amount_usdc)
1713 .get_result::<LedgerEventRow>(conn)?;
1714
1715 diesel::sql_query(
1716 "UPDATE hypercore_cash_ledger_events
1717 SET ledger_event_id = $2,
1718 status = 'submitted',
1719 error = NULL,
1720 updated_at = NOW()
1721 WHERE id = $1 AND ledger_event_id IS NULL",
1722 )
1723 .bind::<BigInt, _>(existing.id)
1724 .bind::<BigInt, _>(ledger.id)
1725 .execute(conn)?;
1726
1727 return Ok(hypercall_db::HypercoreCashLedgerApplyResult {
1728 applied: true,
1729 balance_after: None,
1730 ledger_event_id: Some(u64::try_from(ledger.id).unwrap_or_else(|_| {
1731 panic!(
1732 "STATE_CORRUPTION: negative ledger_event_id {} for hypercore deposit {}",
1733 ledger.id, input.event_hash
1734 )
1735 })),
1736 });
1737 };
1738
1739 let ledger = diesel::sql_query(
1740 "INSERT INTO ledger_events (wallet, event_ts_ms, delta, event_type)
1741 VALUES ($1, $2, $3, 'deposit')
1742 RETURNING id",
1743 )
1744 .bind::<Binary, _>(&input.wallet)
1745 .bind::<BigInt, _>(input.event_time_ms)
1746 .bind::<Numeric, _>(input.amount_usdc)
1747 .get_result::<LedgerEventRow>(conn)?;
1748
1749 diesel::sql_query(
1750 "UPDATE hypercore_cash_ledger_events
1751 SET ledger_event_id = $2,
1752 status = 'submitted',
1753 error = NULL,
1754 updated_at = NOW()
1755 WHERE id = $1",
1756 )
1757 .bind::<BigInt, _>(inserted.id)
1758 .bind::<BigInt, _>(ledger.id)
1759 .execute(conn)?;
1760
1761 Ok(hypercall_db::HypercoreCashLedgerApplyResult {
1762 applied: true,
1763 balance_after: None,
1764 ledger_event_id: Some(u64::try_from(ledger.id).unwrap_or_else(|_| {
1765 panic!(
1766 "STATE_CORRUPTION: negative ledger_event_id {} for hypercore deposit {}",
1767 ledger.id, input.event_hash
1768 )
1769 })),
1770 })
1771 })?;
1772 Ok(row)
1773 }
1774
1775 pub async fn record_hypercore_cash_deposit_non_crediting(
1777 &self,
1778 input: &hypercall_db::HypercoreCashLedgerApply,
1779 ) -> Result<()> {
1780 use diesel::sql_types::{BigInt, Binary, Numeric, Text};
1781 use rust_decimal::Decimal;
1782
1783 if input.amount_usdc <= Decimal::ZERO {
1784 anyhow::bail!(
1785 "non-crediting hypercore cash deposit amount must be positive for {} event_hash={}",
1786 input.wallet,
1787 input.event_hash
1788 );
1789 }
1790
1791 let mut conn = self.pool.get()?;
1792 diesel::sql_query(
1793 "INSERT INTO hypercore_cash_ledger_events
1794 (wallet, event_hash, event_time_ms, delta_type, amount_usdc, status, error)
1795 VALUES ($1, $2, $3, 'pool_transfer', $4, 'submitted', NULL)
1796 ON CONFLICT (wallet, event_hash, delta_type) DO UPDATE
1797 SET status = CASE
1798 WHEN hypercore_cash_ledger_events.ledger_event_id IS NULL
1799 THEN 'submitted'
1800 ELSE hypercore_cash_ledger_events.status
1801 END,
1802 error = CASE
1803 WHEN hypercore_cash_ledger_events.ledger_event_id IS NULL
1804 THEN NULL
1805 ELSE hypercore_cash_ledger_events.error
1806 END,
1807 updated_at = NOW()",
1808 )
1809 .bind::<Binary, _>(&input.wallet)
1810 .bind::<Text, _>(&input.event_hash)
1811 .bind::<BigInt, _>(input.event_time_ms)
1812 .bind::<Numeric, _>(input.amount_usdc)
1813 .execute(&mut conn)?;
1814 Ok(())
1815 }
1816
1817 pub async fn record_hypercore_cash_deposit_pending_margin_mode(
1821 &self,
1822 input: &hypercall_db::HypercoreCashLedgerApply,
1823 ) -> Result<()> {
1824 use diesel::sql_types::{BigInt, Binary, Numeric, Text};
1825 use rust_decimal::Decimal;
1826
1827 if input.amount_usdc <= Decimal::ZERO {
1828 anyhow::bail!(
1829 "hypercore cash deposit amount must be positive for {} event_hash={}",
1830 input.wallet,
1831 input.event_hash
1832 );
1833 }
1834
1835 let mut conn = self.pool.get()?;
1836 diesel::sql_query(
1837 "INSERT INTO hypercore_cash_ledger_events
1838 (wallet, event_hash, event_time_ms, delta_type, amount_usdc, status, error)
1839 VALUES ($1, $2, $3, 'pool_transfer', $4, 'pending_margin_mode', 'missing margin mode')
1840 ON CONFLICT (wallet, event_hash, delta_type) DO UPDATE
1841 SET status = CASE
1842 WHEN hypercore_cash_ledger_events.ledger_event_id IS NULL
1843 THEN 'pending_margin_mode'
1844 ELSE hypercore_cash_ledger_events.status
1845 END,
1846 error = CASE
1847 WHEN hypercore_cash_ledger_events.ledger_event_id IS NULL
1848 THEN 'missing margin mode'
1849 ELSE hypercore_cash_ledger_events.error
1850 END,
1851 updated_at = NOW()",
1852 )
1853 .bind::<Binary, _>(&input.wallet)
1854 .bind::<Text, _>(&input.event_hash)
1855 .bind::<BigInt, _>(input.event_time_ms)
1856 .bind::<Numeric, _>(input.amount_usdc)
1857 .execute(&mut conn)?;
1858 Ok(())
1859 }
1860
1861 pub fn get_exchange_cash_ledger_watermark_sync(&self) -> Result<Option<i64>> {
1863 use diesel::sql_types::{BigInt, Nullable};
1864
1865 #[derive(diesel::QueryableByName)]
1866 struct Row {
1867 #[diesel(sql_type = Nullable<BigInt>)]
1868 last_event_time_ms: Option<i64>,
1869 }
1870
1871 let mut conn = self.pool.get()?;
1872 let row = diesel::sql_query(
1879 "WITH pending_uncredited AS (
1880 SELECT MIN(event_time_ms)::bigint AS event_time_ms
1881 FROM hypercore_cash_ledger_events
1882 WHERE delta_type IN ('deposit', 'pool_transfer')
1883 AND ledger_event_id IS NULL
1884 AND status = 'pending_margin_mode'
1885 ),
1886 credited AS (
1887 SELECT MAX(event_time_ms)::bigint AS event_time_ms
1888 FROM hypercore_cash_ledger_events
1889 WHERE delta_type IN ('deposit', 'pool_transfer')
1890 AND (ledger_event_id IS NOT NULL OR status = 'submitted')
1891 )
1892 SELECT COALESCE(
1893 (SELECT event_time_ms FROM pending_uncredited),
1894 (SELECT event_time_ms FROM credited)
1895 ) AS last_event_time_ms",
1896 )
1897 .get_result::<Row>(&mut conn)?;
1898 Ok(row.last_event_time_ms)
1899 }
1900
1901 pub async fn get_rsm_signer_nonce(
1903 &self,
1904 signer: &hypercall_types::WalletAddress,
1905 ) -> Result<Option<hypercall_db::RsmSignerNonceRecord>> {
1906 use diesel::OptionalExtension;
1907
1908 let mut conn = self.pool.get()?;
1909 let result = crate::schema::rsm_signer_nonces::table
1910 .filter(crate::schema::rsm_signer_nonces::signer_address.eq(signer))
1911 .first::<crate::models::RsmSignerNonceRecord>(&mut conn)
1912 .optional()?;
1913 Ok(result.map(Into::into))
1914 }
1915
1916 pub async fn get_option_instrument_for_credit(
1918 &self,
1919 token: &hypercall_types::WalletAddress,
1920 ) -> Result<Option<hypercall_db::OptionInstrumentForCredit>> {
1921 use diesel::sql_types::{BigInt, Binary, Numeric, Text};
1922 use diesel::OptionalExtension;
1923
1924 #[derive(diesel::QueryableByName)]
1925 struct InstrumentRow {
1926 #[diesel(sql_type = Text)]
1927 id: String,
1928 #[diesel(sql_type = Text)]
1929 underlying: String,
1930 #[diesel(sql_type = BigInt)]
1931 expiry: i64,
1932 #[diesel(sql_type = Numeric)]
1933 strike: rust_decimal::Decimal,
1934 #[diesel(sql_type = Text)]
1935 option_type: String,
1936 }
1937
1938 let mut conn = self.pool.get()?;
1939 let row = diesel::sql_query(
1940 "SELECT id, underlying, expiry::bigint AS expiry, strike, option_type
1941 FROM instruments
1942 WHERE option_token_address = $1
1943 LIMIT 1",
1944 )
1945 .bind::<Binary, _>(token)
1946 .get_result::<InstrumentRow>(&mut conn)
1947 .optional()?;
1948
1949 Ok(row.map(|row| hypercall_db::OptionInstrumentForCredit {
1950 symbol: row.id,
1951 underlying: row.underlying,
1952 expiry: row.expiry,
1953 strike: row.strike,
1954 option_type: row.option_type,
1955 }))
1956 }
1957}
1958
1959#[cfg(test)]
1960mod tests {
1961 use super::*;
1962 use std::sync::atomic::{AtomicUsize, Ordering};
1963
1964 #[test]
1965 fn replaces_multi_statement_concurrent_index_migration() {
1966 let migrations = HypercallMigrations
1967 .migrations()
1968 .expect("embedded migrations should load");
1969 let migration = migrations
1970 .iter()
1971 .find(|migration| {
1972 migration.name().to_string() == ORDER_INFOS_ACTIVE_SYMBOL_INDEX_MIGRATION
1973 })
1974 .expect("order_infos active symbol migration should be present");
1975
1976 assert_eq!(migration.name().version().to_string(), "2026-06-01-000001");
1977 assert!(!migration.metadata().run_in_transaction());
1978 }
1979
1980 #[test]
1981 fn dyn_connection_manager_calls_closure_on_connect() {
1982 let call_count = Arc::new(AtomicUsize::new(0));
1983 let cc = call_count.clone();
1984 let mgr = DynConnectionManager {
1985 url_source: Arc::new(move || {
1986 cc.fetch_add(1, Ordering::SeqCst);
1987 "postgres://invalid-will-fail/db".to_string()
1988 }),
1989 };
1990
1991 let _ = r2d2::ManageConnection::connect(&mgr);
1993 let _ = r2d2::ManageConnection::connect(&mgr);
1994 let _ = r2d2::ManageConnection::connect(&mgr);
1995 assert_eq!(call_count.load(Ordering::SeqCst), 3);
1996 }
1997
1998 #[test]
1999 fn dyn_connection_manager_sees_url_changes() {
2000 let url = Arc::new(std::sync::Mutex::new(
2001 "postgres://first@host/db".to_string(),
2002 ));
2003 let url_clone = url.clone();
2004 let mgr = DynConnectionManager {
2005 url_source: Arc::new(move || url_clone.lock().unwrap().clone()),
2006 };
2007
2008 let _ = r2d2::ManageConnection::connect(&mgr);
2010
2011 *url.lock().unwrap() = "postgres://second@host/db".to_string();
2013
2014 let _ = r2d2::ManageConnection::connect(&mgr);
2016 }
2017
2018 #[test]
2019 fn dyn_connection_manager_from_password_auth() {
2020 let auth = DbAuthConfig::password("postgres://u:p@host/db");
2021 let mgr = DynConnectionManager::new(&auth);
2022 let url = (mgr.url_source)();
2024 assert_eq!(url, "postgres://u:p@host/db");
2025 }
2026
2027 #[cfg(feature = "rds-iam")]
2028 #[test]
2029 fn dyn_connection_manager_from_iam_auth_sees_rotation() {
2030 let parsed = crate::rds_iam::ParsedDbUrl::parse("postgres://user@host/db").unwrap();
2031 let mock_gen = crate::rds_iam::test_support::MockTokenGenerator::new(vec![]);
2032 let provider =
2033 crate::rds_iam::RdsIamTokenProvider::with_generator(parsed, Box::new(mock_gen));
2034 provider.store_url("postgres://user:token_v1@host/db".into());
2035
2036 let auth = DbAuthConfig::rds_iam_with_provider(provider.clone());
2037 let mgr = DynConnectionManager::new(&auth);
2038
2039 assert_eq!((mgr.url_source)(), "postgres://user:token_v1@host/db");
2040
2041 provider.store_url("postgres://user:token_v2@host/db".into());
2042 assert_eq!((mgr.url_source)(), "postgres://user:token_v2@host/db");
2043 }
2044}