1use std::collections::HashMap;
2
3use async_trait::async_trait;
4use hypercall_api::boundary::{engine, market_inputs, read_models};
5use hypercall_types::api_models::{Instrument, Portfolio, TradingLimits, UserTierData};
6use hypercall_types::{EngineMessage, Greeks, WalletAddress};
7use rust_decimal::Decimal;
8use std::time::Instant;
9
10#[derive(Clone)]
11pub struct RuntimeTransactionRequestJournal {
12 journal_batch_sender: Option<crate::journal::JournalBatchSender>,
13 engine_journal_writer: Option<crate::journal::SharedEngineJournalWriter>,
14}
15
16impl RuntimeTransactionRequestJournal {
17 pub fn new(
18 journal_batch_sender: Option<crate::journal::JournalBatchSender>,
19 engine_journal_writer: Option<crate::journal::SharedEngineJournalWriter>,
20 ) -> Self {
21 Self {
22 journal_batch_sender,
23 engine_journal_writer,
24 }
25 }
26}
27
28#[async_trait]
29impl engine::TransactionRequestJournal for RuntimeTransactionRequestJournal {
30 async fn persist_transaction_request(
31 &self,
32 timestamp: u64,
33 command_data: Vec<u8>,
34 message: EngineMessage,
35 request_uuid: uuid::Uuid,
36 ) -> anyhow::Result<()> {
37 let request_uuid = hypercall_db_diesel::engine_enums::DbUuid(request_uuid);
38 if let Some(sender) = self.journal_batch_sender.clone() {
39 let (ack_tx, ack_rx) = tokio::sync::oneshot::channel();
40 let entry = crate::journal::JournalEntry {
41 received_ts_ms: timestamp,
42 command_data,
43 response_data: None,
44 order_id: None,
45 pre_digest: hypercall_types::observability::EngineStateDigest::default(),
46 post_digest: hypercall_types::observability::EngineStateDigest::default(),
47 duration_ms: 0,
48 events: vec![crate::journal::EventPayload {
49 event_topic: hypercall_types::topics::TOPIC_TRANSACTION_REQUESTS.to_string(),
50 event_key: message.partition_key(),
51 event_data: message.serialize_inner()?,
52 l2_sequence: None,
53 event_type_enum:
54 hypercall_db_diesel::engine_enums::EventType::TransactionRequest,
55 }],
56 outbox_appends: Vec::new(),
57 balance_updates: Vec::new(),
58 fill_side_effects: vec![],
59 cash_withdrawal_side_effect: None,
60 created_at: Instant::now(),
61 commit_ack: Some(ack_tx),
62 request_uuid,
63 command_type_enum: None,
64 #[cfg(feature = "rsm-state")]
65 command_identity_hash: [0u8; 32],
66 #[cfg(feature = "rsm-state")]
67 rsm_state_digest: None,
68 };
69
70 let msg = crate::journal::JournalMessage::Entry(entry);
71 match sender.try_send(msg) {
72 Ok(()) => {}
73 Err(tokio::sync::mpsc::error::TrySendError::Full(msg)) => {
74 sender.send(msg).await?;
75 }
76 Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
77 anyhow::bail!("Journal batcher channel is closed");
78 }
79 }
80
81 if ack_rx.await.is_err() {
82 panic!(
83 "CRITICAL_FAILURE: TransactionRequest journal commit_ack dropped for request_uuid {}. \
84 Durability boundary is unknown.",
85 request_uuid.0
86 );
87 }
88 return Ok(());
89 }
90
91 let journal_writer = self.engine_journal_writer.clone().ok_or_else(|| {
92 anyhow::anyhow!("Transaction request journal writer is not configured")
93 })?;
94 let message_for_journal = message.clone();
95 let append_result = tokio::task::spawn_blocking(move || {
96 journal_writer.append_transition(
97 timestamp,
98 &command_data,
99 None,
100 None,
101 &hypercall_types::observability::EngineStateDigest::default(),
102 &hypercall_types::observability::EngineStateDigest::default(),
103 0,
104 &[message_for_journal],
105 request_uuid,
106 None,
107 )
108 })
109 .await??;
110
111 if !append_result.was_new_insert {
112 anyhow::bail!("Transaction request already exists in the journal");
113 }
114
115 Ok(())
116 }
117}
118
119#[async_trait]
120impl market_inputs::GreeksCacheReader for crate::read_cache::greeks::GreeksCache {
121 async fn get_greeks(&self, symbol: &str) -> anyhow::Result<Greeks> {
122 self.get_greeks(symbol).await
123 }
124
125 async fn get_theoretical_price(&self, symbol: &str) -> anyhow::Result<f64> {
126 self.get_theoretical_price(symbol).await
127 }
128
129 async fn get_theoretical_mark(&self, symbol: &str) -> anyhow::Result<f64> {
130 self.get_theoretical_mark(symbol).await
131 }
132
133 async fn get_iv(&self, symbol: &str) -> anyhow::Result<f64> {
134 self.get_iv(symbol).await
135 }
136
137 async fn get_bulk_iv(&self, symbols: &[String]) -> HashMap<String, f64> {
138 self.get_bulk_iv(symbols).await
139 }
140
141 async fn get_all_iv_snapshot(&self) -> HashMap<String, f64> {
142 self.get_all_iv_snapshot().await
143 }
144
145 async fn get_all_spot_prices_snapshot(&self) -> HashMap<String, f64> {
146 self.get_all_spot_prices_snapshot().await
147 }
148
149 async fn get_all_prev_day_prices_snapshot(&self) -> HashMap<String, f64> {
150 self.get_all_prev_day_prices_snapshot().await
151 }
152
153 async fn get_spot_price(&self, underlying: &str) -> Option<f64> {
154 self.get_spot_price(underlying).await
155 }
156
157 async fn get_settlement_price(&self, underlying: &str, expiry_timestamp: i64) -> Option<f64> {
158 self.get_settlement_price(underlying, expiry_timestamp)
159 .await
160 }
161
162 async fn get_quote_side_ivs_from_prices(
163 &self,
164 symbol: &str,
165 best_bid: Option<f64>,
166 best_ask: Option<f64>,
167 ) -> anyhow::Result<(Option<f64>, Option<f64>)> {
168 self.get_quote_side_ivs_from_prices(symbol, best_bid, best_ask)
169 .await
170 }
171
172 async fn has_symbol(&self, symbol: &str) -> bool {
173 self.has_symbol(symbol).await
174 }
175
176 fn get_configured_underlyings(&self) -> Vec<String> {
177 self.get_configured_underlyings()
178 }
179
180 async fn get_spot_price_staleness(&self) -> HashMap<String, Option<f64>> {
181 self.get_spot_price_staleness().await
182 }
183
184 async fn get_unhealthy_oracles(&self) -> Vec<String> {
185 self.get_unhealthy_oracles().await
186 }
187
188 #[cfg(feature = "test-utils")]
189 async fn set_spot_price_for_testing(&self, underlying: &str, price: f64) -> bool {
190 self.set_spot_price_for_testing(underlying, price).await
191 }
192
193 #[cfg(feature = "test-utils")]
194 async fn set_theoretical_iv_for_testing(&self, symbol: &str, iv: f64) {
195 self.set_theoretical_iv_for_testing(symbol, iv).await;
196 }
197}
198
199#[async_trait]
200impl market_inputs::InstrumentsCacheReader
201 for crate::read_cache::instruments_registry::InstrumentsCache
202{
203 async fn get_by_symbol(&self, symbol: &str) -> Option<Instrument> {
204 self.get_by_symbol(symbol).await
205 }
206
207 fn allows_rfq(&self, symbol: &str) -> bool {
208 self.allows_rfq(symbol)
209 }
210
211 async fn get_by_underlying_and_expiry(&self, underlying: &str, expiry: u64) -> Vec<Instrument> {
212 self.get_by_underlying_and_expiry(underlying, expiry).await
213 }
214
215 async fn get_by_instrument_id(&self, instrument_id: i32) -> Option<Instrument> {
216 self.get_by_instrument_id(instrument_id).await
217 }
218
219 async fn get_all(&self) -> Vec<Instrument> {
220 self.get_all().await
221 }
222
223 async fn len(&self) -> usize {
224 self.len().await
225 }
226
227 async fn reload_from_db(&self, db: &dyn hypercall_db::BootstrapReader) -> anyhow::Result<()> {
228 self.reload_from_db(db).await
229 }
230}
231
232#[async_trait]
233impl market_inputs::MarketStatsCacheReader for crate::read_cache::market_stats::MarketStatsCache {
234 async fn get_all_stats(&self) -> HashMap<String, (Decimal, Decimal)> {
235 self.get_all_stats().await
236 }
237}
238
239#[async_trait]
240impl read_models::TierCacheApi for crate::read_cache::tier::TierCache {
241 async fn get_tier(&self, wallet: &WalletAddress) -> Option<UserTierData> {
242 self.get_tier(wallet).await
243 }
244
245 async fn get_tier_record(
246 &self,
247 wallet: &WalletAddress,
248 ) -> anyhow::Result<Option<hypercall_db::UserTierRecord>> {
249 self.get_tier_record(wallet).await
250 }
251
252 async fn get_existing_margin_mode(
253 &self,
254 wallet: &WalletAddress,
255 ) -> anyhow::Result<Option<hypercall_types::MarginMode>> {
256 self.get_existing_margin_mode(wallet).await
257 }
258
259 async fn restore_tier_record(
260 &self,
261 wallet: &WalletAddress,
262 previous_tier: Option<&hypercall_db::UserTierRecord>,
263 ) -> anyhow::Result<()> {
264 self.restore_tier_record(wallet, previous_tier).await
265 }
266
267 async fn set_tier(&self, new_tier: hypercall_db::UserTierUpdate) -> anyhow::Result<()> {
268 self.set_tier(new_tier).await
269 }
270
271 async fn delete_tier(&self, wallet: &WalletAddress) -> anyhow::Result<()> {
272 self.delete_tier(wallet).await
273 }
274
275 async fn get_margin_mode(
276 &self,
277 wallet: &WalletAddress,
278 ) -> anyhow::Result<hypercall_types::MarginMode> {
279 self.get_margin_mode(wallet).await
280 }
281
282 fn get_margin_mode_sync(
283 &self,
284 wallet: &WalletAddress,
285 ) -> anyhow::Result<hypercall_types::MarginMode> {
286 self.get_margin_mode_sync(wallet)
287 }
288
289 async fn set_margin_mode(
290 &self,
291 wallet: &WalletAddress,
292 mode: hypercall_types::MarginMode,
293 ) -> anyhow::Result<i64> {
294 self.set_margin_mode(wallet, mode).await
295 }
296
297 async fn apply_margin_mode_update(
298 &self,
299 wallet: WalletAddress,
300 margin_mode: hypercall_types::MarginMode,
301 version: i64,
302 ) {
303 self.apply_margin_mode_update(wallet, margin_mode, version)
304 .await;
305 }
306
307 fn get_trading_limits(&self, wallet: &WalletAddress) -> TradingLimits {
308 self.get_trading_limits(wallet)
309 }
310
311 async fn get_trading_limits_async(&self, wallet: &WalletAddress) -> TradingLimits {
312 self.get_trading_limits_async(wallet).await
313 }
314}
315
316#[async_trait]
317impl read_models::MmpCacheApi for crate::read_cache::mmp::MmpCache {
318 async fn set_config(&self, config: hypercall_db::MmpConfigRecord) -> anyhow::Result<()> {
319 self.set_config(config).await
320 }
321
322 async fn get_config(
323 &self,
324 wallet: &WalletAddress,
325 currency: &str,
326 ) -> Option<hypercall_db::MmpConfigRecord> {
327 self.get_config(wallet, currency).await
328 }
329
330 async fn get_configs_for_wallet(
331 &self,
332 wallet: &WalletAddress,
333 ) -> Vec<hypercall_db::MmpConfigRecord> {
334 self.get_configs_for_wallet(wallet).await
335 }
336
337 async fn delete_config(&self, wallet: &WalletAddress, currency: &str) -> anyhow::Result<()> {
338 self.delete_config(wallet, currency).await
339 }
340
341 async fn reset_mmp(&self, wallet: &WalletAddress, currency: &str) {
342 self.reset_mmp(wallet, currency).await;
343 }
344}
345
346#[async_trait]
347impl read_models::PortfolioCacheApi for crate::read_cache::portfolio::PortfolioCache {
348 async fn compute_wallet_margin_snapshot(
349 &self,
350 wallet: &WalletAddress,
351 ) -> anyhow::Result<read_models::WalletMarginSnapshot> {
352 let snapshot = self.compute_wallet_margin_snapshot(wallet).await?;
353 Ok(read_models::WalletMarginSnapshot {
354 mode: snapshot.mode,
355 span_margin: snapshot.span_margin,
356 margin_summary: snapshot.margin_summary,
357 total_margin_used: snapshot.total_margin_used,
358 available_balance: snapshot.available_balance,
359 standard_position_contributions: snapshot.standard_position_contributions,
360 standard_option_marks: snapshot.standard_option_marks,
361 })
362 }
363
364 async fn get_portfolio(&self, account: &WalletAddress) -> anyhow::Result<Portfolio> {
365 self.get_portfolio(account).await
366 }
367
368 async fn get_portfolio_fail_closed_pm(
369 &self,
370 account: &WalletAddress,
371 ) -> anyhow::Result<Portfolio> {
372 self.get_portfolio_fail_closed_pm(account).await
373 }
374
375 async fn compute_pm_risk_grid_data(
376 &self,
377 wallet: &WalletAddress,
378 ) -> anyhow::Result<read_models::PmRiskGridData> {
379 let data = self.compute_pm_risk_grid_data(wallet).await?;
380 Ok(read_models::PmRiskGridData {
381 margin_details: data.margin_details,
382 position_details: data.position_details,
383 scenario_pnls: data.scenario_pnls,
384 extended_grid: data.extended_grid,
385 })
386 }
387
388 async fn open_position_count(&self, wallet: &WalletAddress) -> usize {
389 self.get_service()
390 .get_portfolio_balance(wallet)
391 .await
392 .map(|balance| balance.positions.len())
393 .unwrap_or(0)
394 }
395
396 async fn has_live_position_symbol(&self, wallet: &WalletAddress, symbol: &str) -> bool {
397 self.has_live_position_symbol(wallet, symbol).await
398 }
399
400 async fn get_all_portfolios(&self) -> HashMap<WalletAddress, read_models::PortfolioSummary> {
401 self.get_all_portfolios()
402 .await
403 .into_iter()
404 .map(|(wallet, summary)| {
405 (
406 wallet,
407 read_models::PortfolioSummary {
408 positions: summary
409 .positions
410 .into_iter()
411 .map(|(symbol, position)| {
412 (
413 symbol,
414 read_models::PositionSummary {
415 symbol: position.symbol,
416 amount: position.amount,
417 entry_price: position.entry_price,
418 realized_pnl: position.realized_pnl,
419 unrealized_pnl: position.unrealized_pnl,
420 },
421 )
422 })
423 .collect(),
424 margin_info: summary.margin_info.map(|info| read_models::MarginInfo {
425 equity: info.equity,
426 initial_margin: info.initial_margin,
427 maintenance_margin: info.maintenance_margin,
428 }),
429 },
430 )
431 })
432 .collect()
433 }
434
435 async fn subscribe(
436 &self,
437 wallet: WalletAddress,
438 ) -> (
439 u64,
440 tokio::sync::mpsc::UnboundedReceiver<hypercall_types::ws_protocol::PortfolioUpdate>,
441 ) {
442 self.subscribe(wallet).await
443 }
444
445 async fn unsubscribe(&self, wallet: &WalletAddress, subscriber_id: u64) {
446 self.unsubscribe(wallet, subscriber_id).await;
447 }
448
449 async fn handle_engine_message(&self, message: EngineMessage, sequence: i64) {
450 self.handle_engine_message(message, sequence).await;
451 }
452
453 async fn handle_hypercore_position_update(
454 &self,
455 update: hypercall_types::HypercorePositionUpdate,
456 ) {
457 self.handle_hypercore_position_update(update).await;
458 }
459
460 async fn publish_margin_update(&self, wallet: &WalletAddress) {
461 self.publish_margin_update(wallet).await;
462 }
463}
464
465#[derive(Clone)]
466pub struct RuntimeBalanceProvider {
467 provider: std::sync::Arc<dyn crate::rsm::ledger::BalanceProvider + Send + Sync>,
468}
469
470impl RuntimeBalanceProvider {
471 pub fn new(
472 provider: std::sync::Arc<dyn crate::rsm::ledger::BalanceProvider + Send + Sync>,
473 ) -> Self {
474 Self { provider }
475 }
476}
477
478#[async_trait]
479impl read_models::BalanceProvider for RuntimeBalanceProvider {
480 async fn get_balance(&self, wallet: &WalletAddress) -> anyhow::Result<Decimal> {
481 self.provider
482 .get_balance(wallet)
483 .await
484 .map_err(|error| anyhow::anyhow!(error.to_string()))
485 }
486}
487
488#[derive(Clone)]
489pub struct RuntimeBalanceSnapshotProvider {
490 snapshot_provider: std::sync::Arc<crate::rsm::engine_snapshot::SnapshotQuoteProvider>,
491 balance_update_publisher: Option<crate::nats::NatsBalanceUpdatePublisher>,
492 balance_update_stream_required: bool,
493}
494
495impl RuntimeBalanceSnapshotProvider {
496 pub fn new(
497 snapshot_provider: std::sync::Arc<crate::rsm::engine_snapshot::SnapshotQuoteProvider>,
498 balance_update_publisher: Option<crate::nats::NatsBalanceUpdatePublisher>,
499 balance_update_stream_required: bool,
500 ) -> Self {
501 Self {
502 snapshot_provider,
503 balance_update_publisher,
504 balance_update_stream_required,
505 }
506 }
507}
508
509impl read_models::EngineBalanceSnapshotProvider for RuntimeBalanceSnapshotProvider {
510 fn balance_ledger_sync_snapshot(&self) -> read_models::BalanceLedgerSyncSnapshot {
511 let (balance_ledger, balance_update_seq) =
512 self.snapshot_provider.balance_ledger_sync_snapshot();
513 read_models::BalanceLedgerSyncSnapshot {
514 balances: balance_ledger.snapshot_map(),
515 balance_update_seq,
516 balance_update_stream_required: self.balance_update_stream_required,
517 latest_acked_balance_stream_sequence: self
518 .balance_update_publisher
519 .as_ref()
520 .map(|publisher| publisher.last_acked_stream_sequence()),
521 latest_acked_balance_update_seq: self
522 .balance_update_publisher
523 .as_ref()
524 .map(|publisher| publisher.last_acked_balance_update_seq()),
525 }
526 }
527}
528
529#[derive(Clone, Default)]
530pub struct EmptyBalanceSnapshotProvider;
531
532impl read_models::EngineBalanceSnapshotProvider for EmptyBalanceSnapshotProvider {
533 fn balance_ledger_sync_snapshot(&self) -> read_models::BalanceLedgerSyncSnapshot {
534 read_models::BalanceLedgerSyncSnapshot {
535 balances: HashMap::new(),
536 balance_update_seq: 0,
537 balance_update_stream_required: false,
538 latest_acked_balance_stream_sequence: None,
539 latest_acked_balance_update_seq: None,
540 }
541 }
542}
543
544impl From<crate::rsm::engine_snapshot::EngineStateDigest> for engine::EngineStateDigest {
545 fn from(value: crate::rsm::engine_snapshot::EngineStateDigest) -> Self {
546 Self {
547 l2_seq: value.l2_seq,
548 next_order_id: value.next_order_id,
549 next_trade_id: value.next_trade_id,
550 overall_digest: value.overall_digest,
551 orders_digest: value.orders_digest,
552 orders_count: value.orders_count,
553 positions_digest: value.positions_digest,
554 positions_count: value.positions_count,
555 cash_digest: value.cash_digest,
556 cash_wallet_count: value.cash_wallet_count,
557 markets_digest: value.markets_digest,
558 expired_instruments_count: value.expired_instruments_count,
559 trading_modes_count: value.trading_modes_count,
560 prices_digest: value.prices_digest,
561 spot_price_count: value.spot_price_count,
562 iv_surface_count: value.iv_surface_count,
563 iv_source_timestamps_digest: value.iv_source_timestamps_digest,
564 iv_source_timestamp_count: value.iv_source_timestamp_count,
565 perp_positions_digest: value.perp_positions_digest,
566 perp_positions_count: value.perp_positions_count,
567 hypercore_equity_digest: value.hypercore_equity_digest,
568 hypercore_equity_count: value.hypercore_equity_count,
569 mmp_digest: value.mmp_digest,
570 mmp_state_count: value.mmp_state_count,
571 mmp_enabled_digest: value.mmp_enabled_digest,
572 mmp_enabled_count: value.mmp_enabled_count,
573 liquidation_states_digest: value.liquidation_states_digest,
574 liquidation_state_count: value.liquidation_state_count,
575 wallet_margin_modes_digest: value.wallet_margin_modes_digest,
576 wallet_margin_mode_count: value.wallet_margin_mode_count,
577 wallet_trading_limits_digest: value.wallet_trading_limits_digest,
578 wallet_trading_limits_count: value.wallet_trading_limits_count,
579 wallet_tiers_digest: value.wallet_tiers_digest,
580 wallet_tier_count: value.wallet_tier_count,
581 deposit_watermarks_digest: value.deposit_watermarks_digest,
582 deposit_watermark_count: value.deposit_watermark_count,
583 agent_auth_digest: value.agent_auth_digest,
584 agent_auth_count: value.agent_auth_count,
585 nonce_sets_digest: value.nonce_sets_digest,
586 nonce_signer_count: value.nonce_signer_count,
587 }
588 }
589}
590
591#[derive(Clone)]
592pub struct RuntimeEngineStateDigestProvider {
593 provider: std::sync::Arc<dyn crate::rsm::engine_snapshot::EngineStateDigestProvider>,
594}
595
596impl RuntimeEngineStateDigestProvider {
597 pub fn new(
598 provider: std::sync::Arc<dyn crate::rsm::engine_snapshot::EngineStateDigestProvider>,
599 ) -> Self {
600 Self { provider }
601 }
602}
603
604impl engine::EngineStateDigestProvider for RuntimeEngineStateDigestProvider {
605 fn engine_state_digest(&self) -> engine::EngineStateDigest {
606 self.provider.engine_state_digest().into()
607 }
608}
609
610#[derive(Clone)]
611pub struct RuntimeEngineJournalReader {
612 writer: crate::journal::SharedEngineJournalWriter,
613}
614
615impl RuntimeEngineJournalReader {
616 pub fn new(writer: crate::journal::SharedEngineJournalWriter) -> Self {
617 Self { writer }
618 }
619}
620
621impl engine::EngineJournalReader for RuntimeEngineJournalReader {
622 fn get_recent(&self, limit: usize) -> anyhow::Result<Vec<hypercall_db::JournalCommandSummary>> {
623 self.writer.get_recent(limit).map_err(Into::into)
624 }
625
626 fn get_by_request_id(
627 &self,
628 request_id: &uuid::Uuid,
629 ) -> anyhow::Result<Option<hypercall_db::JournalFullRecord>> {
630 self.writer
631 .get_by_request_id(request_id)
632 .map_err(Into::into)
633 }
634}