Skip to main content

hypercall/startup/
database.rs

1use crate::db_handler::{DbAuthConfig, DbPool};
2use anyhow::Context;
3use hypercall_db::{InstrumentReader, InstrumentWriter};
4use hypercall_db_diesel::{DatabaseHandler, DieselDb};
5use std::sync::Arc;
6use std::time::{Instant, SystemTime, UNIX_EPOCH};
7use tracing::{error, info, warn};
8
9pub struct DatabaseResources {
10    pub engine_database_url: String,
11    pub db_auth: DbAuthConfig,
12    pub engine_db_auth: DbAuthConfig,
13    pub db_pool: Arc<DbPool>,
14    pub diesel_db: Arc<DieselDb>,
15    pub shared_db: Arc<DatabaseHandler>,
16    pub standby_mode_active: bool,
17    pub skip_db_migrations: bool,
18}
19
20pub async fn build_database_resources(
21    runtime: &crate::backend_config::BackendRuntime,
22) -> anyhow::Result<DatabaseResources> {
23    // Read-write database URL (always required for promote).
24    let rw_database_url = runtime
25        .secrets
26        .require_database_url()
27        .context("Integrated server requires DATABASE_URL")?
28        .to_string();
29
30    // In standby mode, the engine replay path can use DATABASE_URL_READONLY,
31    // but API pools must be read-write once the process is promoted. They are
32    // long-lived inside AppState and cannot be safely left bound to the replica.
33    let standby_mode_active = std::env::var("STANDBY_MODE")
34        .map(|v| v == "true" || v == "1")
35        .unwrap_or(false);
36    let skip_db_migrations = std::env::var("SKIP_DB_MIGRATIONS")
37        .map(|v| v == "true" || v == "1")
38        .unwrap_or(false);
39
40    let engine_database_url = if standby_mode_active {
41        if let Ok(ro_url) = std::env::var("DATABASE_URL_READONLY") {
42            info!("Standby mode: using DATABASE_URL_READONLY for engine replay");
43            ro_url
44        } else {
45            warn!("Standby mode: DATABASE_URL_READONLY not set, engine replay uses DATABASE_URL");
46            rw_database_url.clone()
47        }
48    } else {
49        rw_database_url.clone()
50    };
51    let database_url = rw_database_url.clone();
52
53    // Resolve database authentication mode (password vs RDS IAM).
54    let db_auth = DbAuthConfig::from_env(&database_url)
55        .await
56        .context("Failed to initialize database authentication")?;
57    tracing::info!(auth = ?db_auth, "Database auth mode");
58
59    // Build a separate auth config for the engine when standby uses a readonly URL.
60    let engine_db_auth = if engine_database_url != database_url {
61        DbAuthConfig::from_env(&engine_database_url)
62            .await
63            .context("Failed to initialize engine database authentication")?
64    } else {
65        db_auth.clone()
66    };
67
68    let diesel_max = runtime.config.database.pool.diesel_max;
69    tracing::info!(diesel_max, "Configuring database connection pools");
70
71    // Split diesel_max between the sync (engine pipeline) and async (API queries)
72    // pools so total connections stay within the configured budget.
73    if diesel_max < 3 {
74        anyhow::bail!(
75            "database.pool.diesel_max must be >= 3 (got {}); sync pool needs >=1 and async pool needs >=2 (catalog advisory lock)",
76            diesel_max
77        );
78    }
79    // Give sync pool the larger share: engine pipeline (fills, settlements,
80    // WAL replay) is latency-critical and often holds connections longer than
81    // the async API read path.
82    let diesel_async_max = diesel_max / 3;
83    // Minimum 2: CatalogManager holds an advisory lock connection while
84    // reconciliation queries use a second connection from the same pool.
85    let diesel_async_max = diesel_async_max.max(2);
86    let diesel_sync_max = diesel_max - diesel_async_max;
87    assert!(
88        diesel_sync_max >= 1,
89        "sync pool must have at least 1 connection"
90    );
91    tracing::info!(
92        diesel_sync_max,
93        diesel_async_max,
94        "Splitting diesel pool budget"
95    );
96
97    // Sync r2d2 pool for DieselEventHandler (engine pipeline, settlements, etc.)
98    let t = Instant::now();
99    let db_pool: Arc<DbPool> = Arc::new(
100        hypercall_db_diesel::build_db_pool(&db_auth, diesel_sync_max, 30_000, 10_000)
101            .map_err(|e| anyhow::anyhow!("Failed to create connection pool: {}", e))?,
102    );
103    record_startup_timing("diesel_pool_create", t.elapsed(), None);
104
105    // Async deadpool for DieselDb (API query handlers)
106    let t_async_pool = Instant::now();
107    let diesel_db = Arc::new(
108        DieselDb::new_with_auth(db_auth.clone(), diesel_async_max as usize)
109            .await
110            .context("Failed to create async DieselDb pool")?,
111    );
112    record_startup_timing("diesel_async_pool_create", t_async_pool.elapsed(), None);
113
114    let shared_db = build_shared_db(db_pool.clone(), standby_mode_active, skip_db_migrations)?;
115    run_startup_migrations_and_reconciliation(
116        shared_db.clone(),
117        standby_mode_active,
118        skip_db_migrations,
119    )?;
120
121    Ok(DatabaseResources {
122        engine_database_url,
123        db_auth,
124        engine_db_auth,
125        db_pool,
126        diesel_db,
127        shared_db,
128        standby_mode_active,
129        skip_db_migrations,
130    })
131}
132
133pub fn extract_db_host(database_url: &Option<String>) -> String {
134    match database_url.as_deref() {
135        Some(url) => {
136            let parts: Vec<&str> = url.split('@').collect();
137            if parts.len() == 2 {
138                let host_and_path = parts[1];
139                return host_and_path
140                    .split('/')
141                    .next()
142                    .unwrap_or("unknown")
143                    .split(':')
144                    .next()
145                    .unwrap_or("unknown")
146                    .to_string();
147            }
148            "unknown".to_string()
149        }
150        None => "not_set".to_string(),
151    }
152}
153
154pub fn extract_db_name(database_url: &Option<String>) -> String {
155    match database_url.as_deref() {
156        Some(url) => {
157            let parts: Vec<&str> = url.split('@').collect();
158            if parts.len() == 2 {
159                let host_and_path = parts[1];
160                let host_parts: Vec<&str> = host_and_path.split('/').collect();
161                if host_parts.len() >= 2 {
162                    return host_parts[1]
163                        .split('?')
164                        .next()
165                        .unwrap_or("unknown")
166                        .to_string();
167                }
168            }
169            "unknown".to_string()
170        }
171        None => "not_set".to_string(),
172    }
173}
174
175fn build_shared_db(
176    db_pool: Arc<DbPool>,
177    standby_mode_active: bool,
178    skip_db_migrations: bool,
179) -> anyhow::Result<Arc<DatabaseHandler>> {
180    // Create a single shared DieselEventHandler using the shared pool.
181    // In standby mode, skip migrations because the readonly replica cannot run DDL.
182    // In production, migrations can be run through an explicit bootstrap path
183    // before the runtime user starts.
184    if standby_mode_active || skip_db_migrations {
185        Ok(Arc::new(DatabaseHandler::with_pool_no_migrations(db_pool)))
186    } else {
187        Ok(Arc::new(DatabaseHandler::with_pool(db_pool).map_err(
188            |error| {
189                error!("FATAL: Failed to create shared Diesel handler: {}", error);
190                anyhow::anyhow!("Failed to create Diesel handler: {}", error)
191            },
192        )?))
193    }
194}
195
196fn run_startup_migrations_and_reconciliation(
197    shared_db: Arc<DatabaseHandler>,
198    standby_mode_active: bool,
199    skip_db_migrations: bool,
200) -> anyhow::Result<()> {
201    if !standby_mode_active && !skip_db_migrations {
202        // Run migrations once for all tables.
203        let t = Instant::now();
204        shared_db.run_migrations().map_err(|error| {
205            error!("FATAL: Failed to run database migrations: {}", error);
206            anyhow::anyhow!("Failed to run migrations: {}", error)
207        })?;
208        record_startup_timing("migrations", t.elapsed(), None);
209        return Ok(());
210    }
211
212    if skip_db_migrations {
213        info!("SKIP_DB_MIGRATIONS enabled: skipping diesel migrations");
214    }
215
216    if !standby_mode_active {
217        return Ok(());
218    }
219
220    info!("Standby mode: skipping diesel migrations (readonly pool)");
221
222    // TODO: Move this out of API/database startup. Jake and Daniel discussed
223    // making expired-instrument reconciliation engine-owned; this remains here
224    // only to preserve existing standby startup behavior during extraction.
225    let t = Instant::now();
226    let reconciled = reconcile_expired_active_instruments_for_standby_startup(shared_db.as_ref())?;
227    metrics::gauge!("ht_standby_startup_expired_active_reconciled").set(reconciled as f64);
228    if reconciled > 0 {
229        warn!(
230            reconciled,
231            "Standby startup reconciled expired ACTIVE instruments before readonly engine load"
232        );
233    }
234    record_startup_timing(
235        "standby_expired_active_reconciliation",
236        t.elapsed(),
237        Some(format!("reconciled={}", reconciled)),
238    );
239    Ok(())
240}
241
242fn reconcile_expired_active_instruments_for_standby_startup(
243    db: &DatabaseHandler,
244) -> anyhow::Result<usize> {
245    let now_secs = SystemTime::now()
246        .duration_since(UNIX_EPOCH)
247        .context("system clock is before UNIX_EPOCH")?
248        .as_secs();
249    let expired_active = db
250        .get_active_instruments_expired_by_sync(now_secs)
251        .context("failed to query expired ACTIVE instruments for standby startup reconciliation")?;
252    if expired_active.is_empty() {
253        return Ok(0);
254    }
255
256    let symbols: Vec<String> = expired_active
257        .into_iter()
258        .map(|instrument| instrument.id)
259        .collect();
260    db.transition_active_instruments_to_expired_pending_sync(&symbols)
261        .with_context(|| {
262            format!(
263                "failed to transition expired ACTIVE instruments to EXPIRED_PENDING_PRICE during standby startup reconciliation: {:?}",
264                symbols
265            )
266        })
267}
268
269fn record_startup_timing(
270    phase: &'static str,
271    elapsed: std::time::Duration,
272    detail: Option<String>,
273) {
274    match detail {
275        Some(detail) => info!("[startup-timing] {}: {:?} ({})", phase, elapsed, detail),
276        None => info!("[startup-timing] {}: {:?}", phase, elapsed),
277    }
278    metrics::gauge!("ht_startup_phase_seconds", "phase" => phase).set(elapsed.as_secs_f64());
279}
280
281#[cfg(test)]
282mod tests {
283    use super::{extract_db_host, extract_db_name};
284
285    #[test]
286    fn extracts_db_host_from_standard_url() {
287        let url = Some("postgres://user:pass@db.example.com:5432/hypercall".to_string());
288
289        assert_eq!(extract_db_host(&url), "db.example.com");
290    }
291
292    #[test]
293    fn extracts_db_name_without_query_params() {
294        let url =
295            Some("postgres://user:pass@db.example.com:5432/hypercall?sslmode=require".to_string());
296
297        assert_eq!(extract_db_name(&url), "hypercall");
298    }
299
300    #[test]
301    fn reports_not_set_for_missing_database_url() {
302        assert_eq!(extract_db_host(&None), "not_set");
303        assert_eq!(extract_db_name(&None), "not_set");
304    }
305
306    #[test]
307    fn reports_unknown_for_unparseable_database_url() {
308        let url = Some("postgres://db.example.com/hypercall".to_string());
309
310        assert_eq!(extract_db_host(&url), "unknown");
311        assert_eq!(extract_db_name(&url), "unknown");
312    }
313}