Skip to main content

hypercall/
db_handler.rs

1use anyhow::{Context, Result};
2use diesel::pg::PgConnection;
3use diesel::r2d2;
4use std::sync::Arc;
5use tracing::info;
6
7use hypercall_types::EngineMessage;
8
9use crate::journal::JournalFillSideEffect;
10
11pub use hypercall_db_diesel::DbAuthConfig;
12pub use hypercall_db_diesel::DbPool;
13pub use hypercall_db_diesel::DynConnectionManager;
14
15/// Redact credentials from a database URL for safe logging.
16/// Turns `postgres://user:pass@host:5432/db` into `postgres://***@host:5432/db`.
17fn redact_database_url(url: &str) -> String {
18    url.find('@')
19        .map(|at| format!("postgres://***@{}", &url[at + 1..]))
20        .unwrap_or_else(|| "postgres://***".to_string())
21}
22
23pub use hypercall_db::IntegrityQueryResults;
24pub use hypercall_db::LiquidationBonusApplyResult;
25pub use hypercall_db::PortfolioReplayEvent;
26pub use hypercall_db::SettlementResult;
27
28pub struct DieselEventHandler {
29    pool: Arc<DbPool>,
30    /// Delegate for event handling / batch cancel / fill persistence.
31    db_handler: hypercall_db_diesel::DatabaseHandler,
32}
33
34impl DieselEventHandler {
35    pub fn new(database_url: &str) -> Result<Self> {
36        Self::new_with_pool_size(database_url, 3)
37    }
38
39    pub fn new_with_pool_size(database_url: &str, pool_max: u32) -> Result<Self> {
40        Self::new_with_auth(DbAuthConfig::password(database_url), pool_max)
41    }
42
43    pub fn new_with_auth(auth: DbAuthConfig, pool_max: u32) -> Result<Self> {
44        let handler = Self::new_pool_only(&auth, pool_max)?;
45        handler.run_migrations().with_context(|| {
46            format!(
47                "Failed to run migrations for database (auth={:?}): {}",
48                auth,
49                redact_database_url(&auth.current_url())
50            )
51        })?;
52        Ok(handler)
53    }
54
55    /// Create a DieselEventHandler with a connection pool but skip migrations.
56    pub fn new_readonly(database_url: &str, pool_max: u32) -> Result<Self> {
57        Self::new_readonly_auth(DbAuthConfig::password(database_url), pool_max)
58    }
59
60    pub fn new_readonly_auth(auth: DbAuthConfig, pool_max: u32) -> Result<Self> {
61        let redacted = redact_database_url(&auth.current_url());
62        info!(
63            "Initializing readonly DieselEventHandler (migrations skipped): {}",
64            redacted
65        );
66        Self::new_pool_only(&auth, pool_max)
67    }
68
69    /// Internal: create the pool without running migrations.
70    fn new_pool_only(auth: &DbAuthConfig, pool_max: u32) -> Result<Self> {
71        let redacted = redact_database_url(&auth.current_url());
72        info!(
73            "Initializing DieselEventHandler for PostgreSQL: {}",
74            redacted
75        );
76        let pool = Arc::new(
77            hypercall_db_diesel::build_db_pool(auth, pool_max, 30_000, 10_000).with_context(
78                || {
79                    format!(
80                        "Failed to create connection pool for database: {}",
81                        redacted
82                    )
83                },
84            )?,
85        );
86        let db_handler =
87            hypercall_db_diesel::DatabaseHandler::with_pool_no_migrations(pool.clone());
88
89        Ok(Self { pool, db_handler })
90    }
91
92    /// Get a connection from the pool
93    pub async fn get_connection(&self) -> Result<r2d2::PooledConnection<DynConnectionManager>> {
94        self.pool
95            .get()
96            .with_context(|| "Failed to get connection from pool")
97    }
98
99    /// Get the connection pool.
100    pub fn pool(&self) -> Arc<DbPool> {
101        self.pool.clone()
102    }
103
104    /// Get a clone of the connection pool (not the Arc).
105    pub fn get_pool(&self) -> DbPool {
106        (*self.pool).clone()
107    }
108
109    /// Create DieselEventHandler with an existing pool
110    pub fn with_pool(pool: Arc<DbPool>) -> Result<Self> {
111        info!("Creating DieselEventHandler with existing pool");
112
113        let db_handler =
114            hypercall_db_diesel::DatabaseHandler::with_pool_no_migrations(pool.clone());
115        let handler = Self { pool, db_handler };
116
117        handler
118            .run_migrations()
119            .with_context(|| "Failed to run migrations")?;
120
121        Ok(handler)
122    }
123
124    /// Create DieselEventHandler with an existing pool, skipping migrations.
125    pub fn with_pool_no_migrations(pool: Arc<DbPool>) -> Self {
126        info!("Creating DieselEventHandler with existing pool (migrations skipped)");
127        let db_handler =
128            hypercall_db_diesel::DatabaseHandler::with_pool_no_migrations(pool.clone());
129        Self { pool, db_handler }
130    }
131
132    /// Get a reference to the underlying `DatabaseHandler`.
133    pub fn db_handler(&self) -> &hypercall_db_diesel::DatabaseHandler {
134        &self.db_handler
135    }
136
137    pub fn run_migrations(&self) -> Result<()> {
138        self.db_handler.run_migrations()
139    }
140
141    // ===== Event handling: delegates to DatabaseHandler =====
142
143    pub async fn handle_event(&self, event: &EngineMessage) -> Result<()> {
144        let mut conn = self.get_connection().await?;
145        self.db_handler.handle_event_with_conn(&mut conn, event)
146    }
147
148    pub fn handle_event_with_conn(
149        &self,
150        conn: &mut PgConnection,
151        event: &EngineMessage,
152    ) -> Result<()> {
153        self.db_handler.handle_event_with_conn(conn, event)
154    }
155
156    pub async fn handle_event_batch(&self, events: Vec<EngineMessage>) -> Result<()> {
157        self.db_handler.handle_event_batch_sync(&events)
158    }
159
160    // ===== Batch cancel: thin wrapper converting StartupExpiredOrderCancel =====
161
162    /// Batch-insert CANCELED rows into order_updates for orders on expired instruments
163    /// discovered during replay. Delegates to `DatabaseHandler::batch_cancel_expired_orders_sync`.
164    pub(crate) fn batch_cancel_expired_orders_sync(
165        &self,
166        cancels: &[crate::rsm::unified_engine::recovery::StartupExpiredOrderCancel],
167        reason: &str,
168    ) -> Result<usize> {
169        let portable: Vec<hypercall_db_diesel::event_handler::ExpiredOrderCancel> = cancels
170            .iter()
171            .map(|c| hypercall_db_diesel::event_handler::ExpiredOrderCancel {
172                order_id: c.order_id,
173                wallet: c.wallet,
174                symbol: c.symbol.clone(),
175                price: c.price,
176                size: c.size,
177                side: c.side,
178                tif: c.tif,
179                is_perp: c.is_perp,
180                underlying: c.underlying.clone(),
181                reduce_only: c.reduce_only,
182                nonce: c.nonce,
183                signature: c.signature.clone(),
184                mmp_enabled: c.mmp_enabled,
185                filled_size: c.filled_size,
186                client_id: c.client_id.clone(),
187                timestamp: c.timestamp,
188            })
189            .collect();
190        self.db_handler
191            .batch_cancel_expired_orders_sync(&portable, reason)
192    }
193
194    // ===== Fill persistence: delegates to free function in event_handler =====
195
196    pub(crate) fn persist_fill_with_side_effects_in_tx(
197        conn: &mut PgConnection,
198        fill: &hypercall_types::Fill,
199        side_effects: &JournalFillSideEffect,
200    ) -> Result<(bool, bool)> {
201        let portable_side_effects = hypercall_db_diesel::event_handler::FillSideEffect {
202            trade_id: side_effects.trade_id,
203            taker_ledger_delta: side_effects.taker_ledger_delta,
204            maker_ledger_delta: side_effects.maker_ledger_delta,
205            taker_premium_delta: side_effects.taker_premium_delta,
206            maker_premium_delta: side_effects.maker_premium_delta,
207            underlying_notional: side_effects.underlying_notional,
208        };
209        hypercall_db_diesel::event_handler::persist_legacy_replay_fill_with_side_effects_in_tx(
210            conn,
211            fill,
212            &portable_side_effects,
213        )
214    }
215}
216
217#[async_trait::async_trait]
218impl hypercall_runtime_api::EngineEventPersistence for DieselEventHandler {
219    async fn handle_event(&self, event: &hypercall_types::EngineMessage) -> anyhow::Result<()> {
220        self.handle_event(event).await
221    }
222}