1use crate::db_handler::{DbAuthConfig, DbPool};
2use anyhow::Context;
3use hypercall_db::{InstrumentReader, InstrumentWriter};
4use hypercall_db_diesel::{DatabaseHandler, DieselDb};
5use std::sync::Arc;
6use std::time::{Instant, SystemTime, UNIX_EPOCH};
7use tracing::{error, info, warn};
8
9pub struct DatabaseResources {
10 pub engine_database_url: String,
11 pub db_auth: DbAuthConfig,
12 pub engine_db_auth: DbAuthConfig,
13 pub db_pool: Arc<DbPool>,
14 pub diesel_db: Arc<DieselDb>,
15 pub shared_db: Arc<DatabaseHandler>,
16 pub standby_mode_active: bool,
17 pub skip_db_migrations: bool,
18}
19
20pub async fn build_database_resources(
21 runtime: &crate::backend_config::BackendRuntime,
22) -> anyhow::Result<DatabaseResources> {
23 let rw_database_url = runtime
25 .secrets
26 .require_database_url()
27 .context("Integrated server requires DATABASE_URL")?
28 .to_string();
29
30 let standby_mode_active = std::env::var("STANDBY_MODE")
34 .map(|v| v == "true" || v == "1")
35 .unwrap_or(false);
36 let skip_db_migrations = std::env::var("SKIP_DB_MIGRATIONS")
37 .map(|v| v == "true" || v == "1")
38 .unwrap_or(false);
39
40 let engine_database_url = if standby_mode_active {
41 if let Ok(ro_url) = std::env::var("DATABASE_URL_READONLY") {
42 info!("Standby mode: using DATABASE_URL_READONLY for engine replay");
43 ro_url
44 } else {
45 warn!("Standby mode: DATABASE_URL_READONLY not set, engine replay uses DATABASE_URL");
46 rw_database_url.clone()
47 }
48 } else {
49 rw_database_url.clone()
50 };
51 let database_url = rw_database_url.clone();
52
53 let db_auth = DbAuthConfig::from_env(&database_url)
55 .await
56 .context("Failed to initialize database authentication")?;
57 tracing::info!(auth = ?db_auth, "Database auth mode");
58
59 let engine_db_auth = if engine_database_url != database_url {
61 DbAuthConfig::from_env(&engine_database_url)
62 .await
63 .context("Failed to initialize engine database authentication")?
64 } else {
65 db_auth.clone()
66 };
67
68 let diesel_max = runtime.config.database.pool.diesel_max;
69 tracing::info!(diesel_max, "Configuring database connection pools");
70
71 if diesel_max < 3 {
74 anyhow::bail!(
75 "database.pool.diesel_max must be >= 3 (got {}); sync pool needs >=1 and async pool needs >=2 (catalog advisory lock)",
76 diesel_max
77 );
78 }
79 let diesel_async_max = diesel_max / 3;
83 let diesel_async_max = diesel_async_max.max(2);
86 let diesel_sync_max = diesel_max - diesel_async_max;
87 assert!(
88 diesel_sync_max >= 1,
89 "sync pool must have at least 1 connection"
90 );
91 tracing::info!(
92 diesel_sync_max,
93 diesel_async_max,
94 "Splitting diesel pool budget"
95 );
96
97 let t = Instant::now();
99 let db_pool: Arc<DbPool> = Arc::new(
100 hypercall_db_diesel::build_db_pool(&db_auth, diesel_sync_max, 30_000, 10_000)
101 .map_err(|e| anyhow::anyhow!("Failed to create connection pool: {}", e))?,
102 );
103 record_startup_timing("diesel_pool_create", t.elapsed(), None);
104
105 let t_async_pool = Instant::now();
107 let diesel_db = Arc::new(
108 DieselDb::new_with_auth(db_auth.clone(), diesel_async_max as usize)
109 .await
110 .context("Failed to create async DieselDb pool")?,
111 );
112 record_startup_timing("diesel_async_pool_create", t_async_pool.elapsed(), None);
113
114 let shared_db = build_shared_db(db_pool.clone(), standby_mode_active, skip_db_migrations)?;
115 run_startup_migrations_and_reconciliation(
116 shared_db.clone(),
117 standby_mode_active,
118 skip_db_migrations,
119 )?;
120
121 Ok(DatabaseResources {
122 engine_database_url,
123 db_auth,
124 engine_db_auth,
125 db_pool,
126 diesel_db,
127 shared_db,
128 standby_mode_active,
129 skip_db_migrations,
130 })
131}
132
133pub fn extract_db_host(database_url: &Option<String>) -> String {
134 match database_url.as_deref() {
135 Some(url) => {
136 let parts: Vec<&str> = url.split('@').collect();
137 if parts.len() == 2 {
138 let host_and_path = parts[1];
139 return host_and_path
140 .split('/')
141 .next()
142 .unwrap_or("unknown")
143 .split(':')
144 .next()
145 .unwrap_or("unknown")
146 .to_string();
147 }
148 "unknown".to_string()
149 }
150 None => "not_set".to_string(),
151 }
152}
153
154pub fn extract_db_name(database_url: &Option<String>) -> String {
155 match database_url.as_deref() {
156 Some(url) => {
157 let parts: Vec<&str> = url.split('@').collect();
158 if parts.len() == 2 {
159 let host_and_path = parts[1];
160 let host_parts: Vec<&str> = host_and_path.split('/').collect();
161 if host_parts.len() >= 2 {
162 return host_parts[1]
163 .split('?')
164 .next()
165 .unwrap_or("unknown")
166 .to_string();
167 }
168 }
169 "unknown".to_string()
170 }
171 None => "not_set".to_string(),
172 }
173}
174
175fn build_shared_db(
176 db_pool: Arc<DbPool>,
177 standby_mode_active: bool,
178 skip_db_migrations: bool,
179) -> anyhow::Result<Arc<DatabaseHandler>> {
180 if standby_mode_active || skip_db_migrations {
185 Ok(Arc::new(DatabaseHandler::with_pool_no_migrations(db_pool)))
186 } else {
187 Ok(Arc::new(DatabaseHandler::with_pool(db_pool).map_err(
188 |error| {
189 error!("FATAL: Failed to create shared Diesel handler: {}", error);
190 anyhow::anyhow!("Failed to create Diesel handler: {}", error)
191 },
192 )?))
193 }
194}
195
196fn run_startup_migrations_and_reconciliation(
197 shared_db: Arc<DatabaseHandler>,
198 standby_mode_active: bool,
199 skip_db_migrations: bool,
200) -> anyhow::Result<()> {
201 if !standby_mode_active && !skip_db_migrations {
202 let t = Instant::now();
204 shared_db.run_migrations().map_err(|error| {
205 error!("FATAL: Failed to run database migrations: {}", error);
206 anyhow::anyhow!("Failed to run migrations: {}", error)
207 })?;
208 record_startup_timing("migrations", t.elapsed(), None);
209 return Ok(());
210 }
211
212 if skip_db_migrations {
213 info!("SKIP_DB_MIGRATIONS enabled: skipping diesel migrations");
214 }
215
216 if !standby_mode_active {
217 return Ok(());
218 }
219
220 info!("Standby mode: skipping diesel migrations (readonly pool)");
221
222 let t = Instant::now();
226 let reconciled = reconcile_expired_active_instruments_for_standby_startup(shared_db.as_ref())?;
227 metrics::gauge!("ht_standby_startup_expired_active_reconciled").set(reconciled as f64);
228 if reconciled > 0 {
229 warn!(
230 reconciled,
231 "Standby startup reconciled expired ACTIVE instruments before readonly engine load"
232 );
233 }
234 record_startup_timing(
235 "standby_expired_active_reconciliation",
236 t.elapsed(),
237 Some(format!("reconciled={}", reconciled)),
238 );
239 Ok(())
240}
241
242fn reconcile_expired_active_instruments_for_standby_startup(
243 db: &DatabaseHandler,
244) -> anyhow::Result<usize> {
245 let now_secs = SystemTime::now()
246 .duration_since(UNIX_EPOCH)
247 .context("system clock is before UNIX_EPOCH")?
248 .as_secs();
249 let expired_active = db
250 .get_active_instruments_expired_by_sync(now_secs)
251 .context("failed to query expired ACTIVE instruments for standby startup reconciliation")?;
252 if expired_active.is_empty() {
253 return Ok(0);
254 }
255
256 let symbols: Vec<String> = expired_active
257 .into_iter()
258 .map(|instrument| instrument.id)
259 .collect();
260 db.transition_active_instruments_to_expired_pending_sync(&symbols)
261 .with_context(|| {
262 format!(
263 "failed to transition expired ACTIVE instruments to EXPIRED_PENDING_PRICE during standby startup reconciliation: {:?}",
264 symbols
265 )
266 })
267}
268
269fn record_startup_timing(
270 phase: &'static str,
271 elapsed: std::time::Duration,
272 detail: Option<String>,
273) {
274 match detail {
275 Some(detail) => info!("[startup-timing] {}: {:?} ({})", phase, elapsed, detail),
276 None => info!("[startup-timing] {}: {:?}", phase, elapsed),
277 }
278 metrics::gauge!("ht_startup_phase_seconds", "phase" => phase).set(elapsed.as_secs_f64());
279}
280
281#[cfg(test)]
282mod tests {
283 use super::{extract_db_host, extract_db_name};
284
285 #[test]
286 fn extracts_db_host_from_standard_url() {
287 let url = Some("postgres://user:pass@db.example.com:5432/hypercall".to_string());
288
289 assert_eq!(extract_db_host(&url), "db.example.com");
290 }
291
292 #[test]
293 fn extracts_db_name_without_query_params() {
294 let url =
295 Some("postgres://user:pass@db.example.com:5432/hypercall?sslmode=require".to_string());
296
297 assert_eq!(extract_db_name(&url), "hypercall");
298 }
299
300 #[test]
301 fn reports_not_set_for_missing_database_url() {
302 assert_eq!(extract_db_host(&None), "not_set");
303 assert_eq!(extract_db_name(&None), "not_set");
304 }
305
306 #[test]
307 fn reports_unknown_for_unparseable_database_url() {
308 let url = Some("postgres://db.example.com/hypercall".to_string());
309
310 assert_eq!(extract_db_host(&url), "unknown");
311 assert_eq!(extract_db_name(&url), "unknown");
312 }
313}