1use anyhow::{Context, Result};
2use diesel::pg::PgConnection;
3use diesel::r2d2;
4use std::sync::Arc;
5use tracing::info;
6
7use hypercall_types::EngineMessage;
8
9use crate::journal::JournalFillSideEffect;
10
11pub use hypercall_db_diesel::DbAuthConfig;
12pub use hypercall_db_diesel::DbPool;
13pub use hypercall_db_diesel::DynConnectionManager;
14
15fn redact_database_url(url: &str) -> String {
18 url.find('@')
19 .map(|at| format!("postgres://***@{}", &url[at + 1..]))
20 .unwrap_or_else(|| "postgres://***".to_string())
21}
22
23pub use hypercall_db::IntegrityQueryResults;
24pub use hypercall_db::LiquidationBonusApplyResult;
25pub use hypercall_db::PortfolioReplayEvent;
26pub use hypercall_db::SettlementResult;
27
28pub struct DieselEventHandler {
29 pool: Arc<DbPool>,
30 db_handler: hypercall_db_diesel::DatabaseHandler,
32}
33
34impl DieselEventHandler {
35 pub fn new(database_url: &str) -> Result<Self> {
36 Self::new_with_pool_size(database_url, 3)
37 }
38
39 pub fn new_with_pool_size(database_url: &str, pool_max: u32) -> Result<Self> {
40 Self::new_with_auth(DbAuthConfig::password(database_url), pool_max)
41 }
42
43 pub fn new_with_auth(auth: DbAuthConfig, pool_max: u32) -> Result<Self> {
44 let handler = Self::new_pool_only(&auth, pool_max)?;
45 handler.run_migrations().with_context(|| {
46 format!(
47 "Failed to run migrations for database (auth={:?}): {}",
48 auth,
49 redact_database_url(&auth.current_url())
50 )
51 })?;
52 Ok(handler)
53 }
54
55 pub fn new_readonly(database_url: &str, pool_max: u32) -> Result<Self> {
57 Self::new_readonly_auth(DbAuthConfig::password(database_url), pool_max)
58 }
59
60 pub fn new_readonly_auth(auth: DbAuthConfig, pool_max: u32) -> Result<Self> {
61 let redacted = redact_database_url(&auth.current_url());
62 info!(
63 "Initializing readonly DieselEventHandler (migrations skipped): {}",
64 redacted
65 );
66 Self::new_pool_only(&auth, pool_max)
67 }
68
69 fn new_pool_only(auth: &DbAuthConfig, pool_max: u32) -> Result<Self> {
71 let redacted = redact_database_url(&auth.current_url());
72 info!(
73 "Initializing DieselEventHandler for PostgreSQL: {}",
74 redacted
75 );
76 let pool = Arc::new(
77 hypercall_db_diesel::build_db_pool(auth, pool_max, 30_000, 10_000).with_context(
78 || {
79 format!(
80 "Failed to create connection pool for database: {}",
81 redacted
82 )
83 },
84 )?,
85 );
86 let db_handler =
87 hypercall_db_diesel::DatabaseHandler::with_pool_no_migrations(pool.clone());
88
89 Ok(Self { pool, db_handler })
90 }
91
92 pub async fn get_connection(&self) -> Result<r2d2::PooledConnection<DynConnectionManager>> {
94 self.pool
95 .get()
96 .with_context(|| "Failed to get connection from pool")
97 }
98
99 pub fn pool(&self) -> Arc<DbPool> {
101 self.pool.clone()
102 }
103
104 pub fn get_pool(&self) -> DbPool {
106 (*self.pool).clone()
107 }
108
109 pub fn with_pool(pool: Arc<DbPool>) -> Result<Self> {
111 info!("Creating DieselEventHandler with existing pool");
112
113 let db_handler =
114 hypercall_db_diesel::DatabaseHandler::with_pool_no_migrations(pool.clone());
115 let handler = Self { pool, db_handler };
116
117 handler
118 .run_migrations()
119 .with_context(|| "Failed to run migrations")?;
120
121 Ok(handler)
122 }
123
124 pub fn with_pool_no_migrations(pool: Arc<DbPool>) -> Self {
126 info!("Creating DieselEventHandler with existing pool (migrations skipped)");
127 let db_handler =
128 hypercall_db_diesel::DatabaseHandler::with_pool_no_migrations(pool.clone());
129 Self { pool, db_handler }
130 }
131
132 pub fn db_handler(&self) -> &hypercall_db_diesel::DatabaseHandler {
134 &self.db_handler
135 }
136
137 pub fn run_migrations(&self) -> Result<()> {
138 self.db_handler.run_migrations()
139 }
140
141 pub async fn handle_event(&self, event: &EngineMessage) -> Result<()> {
144 let mut conn = self.get_connection().await?;
145 self.db_handler.handle_event_with_conn(&mut conn, event)
146 }
147
148 pub fn handle_event_with_conn(
149 &self,
150 conn: &mut PgConnection,
151 event: &EngineMessage,
152 ) -> Result<()> {
153 self.db_handler.handle_event_with_conn(conn, event)
154 }
155
156 pub async fn handle_event_batch(&self, events: Vec<EngineMessage>) -> Result<()> {
157 self.db_handler.handle_event_batch_sync(&events)
158 }
159
160 pub(crate) fn batch_cancel_expired_orders_sync(
165 &self,
166 cancels: &[crate::rsm::unified_engine::recovery::StartupExpiredOrderCancel],
167 reason: &str,
168 ) -> Result<usize> {
169 let portable: Vec<hypercall_db_diesel::event_handler::ExpiredOrderCancel> = cancels
170 .iter()
171 .map(|c| hypercall_db_diesel::event_handler::ExpiredOrderCancel {
172 order_id: c.order_id,
173 wallet: c.wallet,
174 symbol: c.symbol.clone(),
175 price: c.price,
176 size: c.size,
177 side: c.side,
178 tif: c.tif,
179 is_perp: c.is_perp,
180 underlying: c.underlying.clone(),
181 reduce_only: c.reduce_only,
182 nonce: c.nonce,
183 signature: c.signature.clone(),
184 mmp_enabled: c.mmp_enabled,
185 filled_size: c.filled_size,
186 client_id: c.client_id.clone(),
187 timestamp: c.timestamp,
188 })
189 .collect();
190 self.db_handler
191 .batch_cancel_expired_orders_sync(&portable, reason)
192 }
193
194 pub(crate) fn persist_fill_with_side_effects_in_tx(
197 conn: &mut PgConnection,
198 fill: &hypercall_types::Fill,
199 side_effects: &JournalFillSideEffect,
200 ) -> Result<(bool, bool)> {
201 let portable_side_effects = hypercall_db_diesel::event_handler::FillSideEffect {
202 trade_id: side_effects.trade_id,
203 taker_ledger_delta: side_effects.taker_ledger_delta,
204 maker_ledger_delta: side_effects.maker_ledger_delta,
205 taker_premium_delta: side_effects.taker_premium_delta,
206 maker_premium_delta: side_effects.maker_premium_delta,
207 underlying_notional: side_effects.underlying_notional,
208 };
209 hypercall_db_diesel::event_handler::persist_legacy_replay_fill_with_side_effects_in_tx(
210 conn,
211 fill,
212 &portable_side_effects,
213 )
214 }
215}
216
217#[async_trait::async_trait]
218impl hypercall_runtime_api::EngineEventPersistence for DieselEventHandler {
219 async fn handle_event(&self, event: &hypercall_types::EngineMessage) -> anyhow::Result<()> {
220 self.handle_event(event).await
221 }
222}