Skip to main content

hypercall/
api_boundary_impls.rs

1use std::collections::HashMap;
2
3use async_trait::async_trait;
4use hypercall_api::boundary::{engine, market_inputs, read_models};
5use hypercall_types::api_models::{Instrument, Portfolio, TradingLimits, UserTierData};
6use hypercall_types::{EngineMessage, Greeks, WalletAddress};
7use rust_decimal::Decimal;
8use std::time::Instant;
9
10#[derive(Clone)]
11pub struct RuntimeTransactionRequestJournal {
12    journal_batch_sender: Option<crate::journal::JournalBatchSender>,
13    engine_journal_writer: Option<crate::journal::SharedEngineJournalWriter>,
14}
15
16impl RuntimeTransactionRequestJournal {
17    pub fn new(
18        journal_batch_sender: Option<crate::journal::JournalBatchSender>,
19        engine_journal_writer: Option<crate::journal::SharedEngineJournalWriter>,
20    ) -> Self {
21        Self {
22            journal_batch_sender,
23            engine_journal_writer,
24        }
25    }
26}
27
28#[async_trait]
29impl engine::TransactionRequestJournal for RuntimeTransactionRequestJournal {
30    async fn persist_transaction_request(
31        &self,
32        timestamp: u64,
33        command_data: Vec<u8>,
34        message: EngineMessage,
35        request_uuid: uuid::Uuid,
36    ) -> anyhow::Result<()> {
37        let request_uuid = hypercall_db_diesel::engine_enums::DbUuid(request_uuid);
38        if let Some(sender) = self.journal_batch_sender.clone() {
39            let (ack_tx, ack_rx) = tokio::sync::oneshot::channel();
40            let entry = crate::journal::JournalEntry {
41                received_ts_ms: timestamp,
42                command_data,
43                response_data: None,
44                order_id: None,
45                pre_digest: hypercall_types::observability::EngineStateDigest::default(),
46                post_digest: hypercall_types::observability::EngineStateDigest::default(),
47                duration_ms: 0,
48                events: vec![crate::journal::EventPayload {
49                    event_topic: hypercall_types::topics::TOPIC_TRANSACTION_REQUESTS.to_string(),
50                    event_key: message.partition_key(),
51                    event_data: message.serialize_inner()?,
52                    l2_sequence: None,
53                    event_type_enum:
54                        hypercall_db_diesel::engine_enums::EventType::TransactionRequest,
55                }],
56                outbox_appends: Vec::new(),
57                balance_updates: Vec::new(),
58                fill_side_effects: vec![],
59                cash_withdrawal_side_effect: None,
60                created_at: Instant::now(),
61                commit_ack: Some(ack_tx),
62                request_uuid,
63                command_type_enum: None,
64                #[cfg(feature = "rsm-state")]
65                command_identity_hash: [0u8; 32],
66                #[cfg(feature = "rsm-state")]
67                rsm_state_digest: None,
68            };
69
70            let msg = crate::journal::JournalMessage::Entry(entry);
71            match sender.try_send(msg) {
72                Ok(()) => {}
73                Err(tokio::sync::mpsc::error::TrySendError::Full(msg)) => {
74                    sender.send(msg).await?;
75                }
76                Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
77                    anyhow::bail!("Journal batcher channel is closed");
78                }
79            }
80
81            if ack_rx.await.is_err() {
82                panic!(
83                    "CRITICAL_FAILURE: TransactionRequest journal commit_ack dropped for request_uuid {}. \
84                     Durability boundary is unknown.",
85                    request_uuid.0
86                );
87            }
88            return Ok(());
89        }
90
91        let journal_writer = self.engine_journal_writer.clone().ok_or_else(|| {
92            anyhow::anyhow!("Transaction request journal writer is not configured")
93        })?;
94        let message_for_journal = message.clone();
95        let append_result = tokio::task::spawn_blocking(move || {
96            journal_writer.append_transition(
97                timestamp,
98                &command_data,
99                None,
100                None,
101                &hypercall_types::observability::EngineStateDigest::default(),
102                &hypercall_types::observability::EngineStateDigest::default(),
103                0,
104                &[message_for_journal],
105                request_uuid,
106                None,
107            )
108        })
109        .await??;
110
111        if !append_result.was_new_insert {
112            anyhow::bail!("Transaction request already exists in the journal");
113        }
114
115        Ok(())
116    }
117}
118
119#[async_trait]
120impl market_inputs::GreeksCacheReader for crate::read_cache::greeks::GreeksCache {
121    async fn get_greeks(&self, symbol: &str) -> anyhow::Result<Greeks> {
122        self.get_greeks(symbol).await
123    }
124
125    async fn get_theoretical_price(&self, symbol: &str) -> anyhow::Result<f64> {
126        self.get_theoretical_price(symbol).await
127    }
128
129    async fn get_theoretical_mark(&self, symbol: &str) -> anyhow::Result<f64> {
130        self.get_theoretical_mark(symbol).await
131    }
132
133    async fn get_iv(&self, symbol: &str) -> anyhow::Result<f64> {
134        self.get_iv(symbol).await
135    }
136
137    async fn get_bulk_iv(&self, symbols: &[String]) -> HashMap<String, f64> {
138        self.get_bulk_iv(symbols).await
139    }
140
141    async fn get_all_iv_snapshot(&self) -> HashMap<String, f64> {
142        self.get_all_iv_snapshot().await
143    }
144
145    async fn get_all_spot_prices_snapshot(&self) -> HashMap<String, f64> {
146        self.get_all_spot_prices_snapshot().await
147    }
148
149    async fn get_all_prev_day_prices_snapshot(&self) -> HashMap<String, f64> {
150        self.get_all_prev_day_prices_snapshot().await
151    }
152
153    async fn get_spot_price(&self, underlying: &str) -> Option<f64> {
154        self.get_spot_price(underlying).await
155    }
156
157    async fn get_settlement_price(&self, underlying: &str, expiry_timestamp: i64) -> Option<f64> {
158        self.get_settlement_price(underlying, expiry_timestamp)
159            .await
160    }
161
162    async fn get_quote_side_ivs_from_prices(
163        &self,
164        symbol: &str,
165        best_bid: Option<f64>,
166        best_ask: Option<f64>,
167    ) -> anyhow::Result<(Option<f64>, Option<f64>)> {
168        self.get_quote_side_ivs_from_prices(symbol, best_bid, best_ask)
169            .await
170    }
171
172    async fn has_symbol(&self, symbol: &str) -> bool {
173        self.has_symbol(symbol).await
174    }
175
176    fn get_configured_underlyings(&self) -> Vec<String> {
177        self.get_configured_underlyings()
178    }
179
180    async fn get_spot_price_staleness(&self) -> HashMap<String, Option<f64>> {
181        self.get_spot_price_staleness().await
182    }
183
184    async fn get_unhealthy_oracles(&self) -> Vec<String> {
185        self.get_unhealthy_oracles().await
186    }
187
188    #[cfg(feature = "test-utils")]
189    async fn set_spot_price_for_testing(&self, underlying: &str, price: f64) -> bool {
190        self.set_spot_price_for_testing(underlying, price).await
191    }
192
193    #[cfg(feature = "test-utils")]
194    async fn set_theoretical_iv_for_testing(&self, symbol: &str, iv: f64) {
195        self.set_theoretical_iv_for_testing(symbol, iv).await;
196    }
197}
198
199#[async_trait]
200impl market_inputs::InstrumentsCacheReader
201    for crate::read_cache::instruments_registry::InstrumentsCache
202{
203    async fn get_by_symbol(&self, symbol: &str) -> Option<Instrument> {
204        self.get_by_symbol(symbol).await
205    }
206
207    fn allows_rfq(&self, symbol: &str) -> bool {
208        self.allows_rfq(symbol)
209    }
210
211    async fn get_by_underlying_and_expiry(&self, underlying: &str, expiry: u64) -> Vec<Instrument> {
212        self.get_by_underlying_and_expiry(underlying, expiry).await
213    }
214
215    async fn get_by_instrument_id(&self, instrument_id: i32) -> Option<Instrument> {
216        self.get_by_instrument_id(instrument_id).await
217    }
218
219    async fn get_all(&self) -> Vec<Instrument> {
220        self.get_all().await
221    }
222
223    async fn len(&self) -> usize {
224        self.len().await
225    }
226
227    async fn reload_from_db(&self, db: &dyn hypercall_db::BootstrapReader) -> anyhow::Result<()> {
228        self.reload_from_db(db).await
229    }
230}
231
232#[async_trait]
233impl market_inputs::MarketStatsCacheReader for crate::read_cache::market_stats::MarketStatsCache {
234    async fn get_all_stats(&self) -> HashMap<String, (Decimal, Decimal)> {
235        self.get_all_stats().await
236    }
237}
238
239#[async_trait]
240impl read_models::TierCacheApi for crate::read_cache::tier::TierCache {
241    async fn get_tier(&self, wallet: &WalletAddress) -> Option<UserTierData> {
242        self.get_tier(wallet).await
243    }
244
245    async fn get_tier_record(
246        &self,
247        wallet: &WalletAddress,
248    ) -> anyhow::Result<Option<hypercall_db::UserTierRecord>> {
249        self.get_tier_record(wallet).await
250    }
251
252    async fn get_existing_margin_mode(
253        &self,
254        wallet: &WalletAddress,
255    ) -> anyhow::Result<Option<hypercall_types::MarginMode>> {
256        self.get_existing_margin_mode(wallet).await
257    }
258
259    async fn restore_tier_record(
260        &self,
261        wallet: &WalletAddress,
262        previous_tier: Option<&hypercall_db::UserTierRecord>,
263    ) -> anyhow::Result<()> {
264        self.restore_tier_record(wallet, previous_tier).await
265    }
266
267    async fn set_tier(&self, new_tier: hypercall_db::UserTierUpdate) -> anyhow::Result<()> {
268        self.set_tier(new_tier).await
269    }
270
271    async fn delete_tier(&self, wallet: &WalletAddress) -> anyhow::Result<()> {
272        self.delete_tier(wallet).await
273    }
274
275    async fn get_margin_mode(
276        &self,
277        wallet: &WalletAddress,
278    ) -> anyhow::Result<hypercall_types::MarginMode> {
279        self.get_margin_mode(wallet).await
280    }
281
282    fn get_margin_mode_sync(
283        &self,
284        wallet: &WalletAddress,
285    ) -> anyhow::Result<hypercall_types::MarginMode> {
286        self.get_margin_mode_sync(wallet)
287    }
288
289    async fn set_margin_mode(
290        &self,
291        wallet: &WalletAddress,
292        mode: hypercall_types::MarginMode,
293    ) -> anyhow::Result<i64> {
294        self.set_margin_mode(wallet, mode).await
295    }
296
297    async fn apply_margin_mode_update(
298        &self,
299        wallet: WalletAddress,
300        margin_mode: hypercall_types::MarginMode,
301        version: i64,
302    ) {
303        self.apply_margin_mode_update(wallet, margin_mode, version)
304            .await;
305    }
306
307    fn get_trading_limits(&self, wallet: &WalletAddress) -> TradingLimits {
308        self.get_trading_limits(wallet)
309    }
310
311    async fn get_trading_limits_async(&self, wallet: &WalletAddress) -> TradingLimits {
312        self.get_trading_limits_async(wallet).await
313    }
314}
315
316#[async_trait]
317impl read_models::MmpCacheApi for crate::read_cache::mmp::MmpCache {
318    async fn set_config(&self, config: hypercall_db::MmpConfigRecord) -> anyhow::Result<()> {
319        self.set_config(config).await
320    }
321
322    async fn get_config(
323        &self,
324        wallet: &WalletAddress,
325        currency: &str,
326    ) -> Option<hypercall_db::MmpConfigRecord> {
327        self.get_config(wallet, currency).await
328    }
329
330    async fn get_configs_for_wallet(
331        &self,
332        wallet: &WalletAddress,
333    ) -> Vec<hypercall_db::MmpConfigRecord> {
334        self.get_configs_for_wallet(wallet).await
335    }
336
337    async fn delete_config(&self, wallet: &WalletAddress, currency: &str) -> anyhow::Result<()> {
338        self.delete_config(wallet, currency).await
339    }
340
341    async fn reset_mmp(&self, wallet: &WalletAddress, currency: &str) {
342        self.reset_mmp(wallet, currency).await;
343    }
344}
345
346#[async_trait]
347impl read_models::PortfolioCacheApi for crate::read_cache::portfolio::PortfolioCache {
348    async fn compute_wallet_margin_snapshot(
349        &self,
350        wallet: &WalletAddress,
351    ) -> anyhow::Result<read_models::WalletMarginSnapshot> {
352        let snapshot = self.compute_wallet_margin_snapshot(wallet).await?;
353        Ok(read_models::WalletMarginSnapshot {
354            mode: snapshot.mode,
355            span_margin: snapshot.span_margin,
356            margin_summary: snapshot.margin_summary,
357            total_margin_used: snapshot.total_margin_used,
358            available_balance: snapshot.available_balance,
359            standard_position_contributions: snapshot.standard_position_contributions,
360            standard_option_marks: snapshot.standard_option_marks,
361        })
362    }
363
364    async fn get_portfolio(&self, account: &WalletAddress) -> anyhow::Result<Portfolio> {
365        self.get_portfolio(account).await
366    }
367
368    async fn get_portfolio_fail_closed_pm(
369        &self,
370        account: &WalletAddress,
371    ) -> anyhow::Result<Portfolio> {
372        self.get_portfolio_fail_closed_pm(account).await
373    }
374
375    async fn compute_pm_risk_grid_data(
376        &self,
377        wallet: &WalletAddress,
378    ) -> anyhow::Result<read_models::PmRiskGridData> {
379        let data = self.compute_pm_risk_grid_data(wallet).await?;
380        Ok(read_models::PmRiskGridData {
381            margin_details: data.margin_details,
382            position_details: data.position_details,
383            scenario_pnls: data.scenario_pnls,
384            extended_grid: data.extended_grid,
385        })
386    }
387
388    async fn open_position_count(&self, wallet: &WalletAddress) -> usize {
389        self.get_service()
390            .get_portfolio_balance(wallet)
391            .await
392            .map(|balance| balance.positions.len())
393            .unwrap_or(0)
394    }
395
396    async fn has_live_position_symbol(&self, wallet: &WalletAddress, symbol: &str) -> bool {
397        self.has_live_position_symbol(wallet, symbol).await
398    }
399
400    async fn get_all_portfolios(&self) -> HashMap<WalletAddress, read_models::PortfolioSummary> {
401        self.get_all_portfolios()
402            .await
403            .into_iter()
404            .map(|(wallet, summary)| {
405                (
406                    wallet,
407                    read_models::PortfolioSummary {
408                        positions: summary
409                            .positions
410                            .into_iter()
411                            .map(|(symbol, position)| {
412                                (
413                                    symbol,
414                                    read_models::PositionSummary {
415                                        symbol: position.symbol,
416                                        amount: position.amount,
417                                        entry_price: position.entry_price,
418                                        realized_pnl: position.realized_pnl,
419                                        unrealized_pnl: position.unrealized_pnl,
420                                    },
421                                )
422                            })
423                            .collect(),
424                        margin_info: summary.margin_info.map(|info| read_models::MarginInfo {
425                            equity: info.equity,
426                            initial_margin: info.initial_margin,
427                            maintenance_margin: info.maintenance_margin,
428                        }),
429                    },
430                )
431            })
432            .collect()
433    }
434
435    async fn subscribe(
436        &self,
437        wallet: WalletAddress,
438    ) -> (
439        u64,
440        tokio::sync::mpsc::UnboundedReceiver<hypercall_types::ws_protocol::PortfolioUpdate>,
441    ) {
442        self.subscribe(wallet).await
443    }
444
445    async fn unsubscribe(&self, wallet: &WalletAddress, subscriber_id: u64) {
446        self.unsubscribe(wallet, subscriber_id).await;
447    }
448
449    async fn handle_engine_message(&self, message: EngineMessage, sequence: i64) {
450        self.handle_engine_message(message, sequence).await;
451    }
452
453    async fn handle_hypercore_position_update(
454        &self,
455        update: hypercall_types::HypercorePositionUpdate,
456    ) {
457        self.handle_hypercore_position_update(update).await;
458    }
459
460    async fn publish_margin_update(&self, wallet: &WalletAddress) {
461        self.publish_margin_update(wallet).await;
462    }
463}
464
465#[derive(Clone)]
466pub struct RuntimeBalanceProvider {
467    provider: std::sync::Arc<dyn crate::rsm::ledger::BalanceProvider + Send + Sync>,
468}
469
470impl RuntimeBalanceProvider {
471    pub fn new(
472        provider: std::sync::Arc<dyn crate::rsm::ledger::BalanceProvider + Send + Sync>,
473    ) -> Self {
474        Self { provider }
475    }
476}
477
478#[async_trait]
479impl read_models::BalanceProvider for RuntimeBalanceProvider {
480    async fn get_balance(&self, wallet: &WalletAddress) -> anyhow::Result<Decimal> {
481        self.provider
482            .get_balance(wallet)
483            .await
484            .map_err(|error| anyhow::anyhow!(error.to_string()))
485    }
486}
487
488#[derive(Clone)]
489pub struct RuntimeBalanceSnapshotProvider {
490    snapshot_provider: std::sync::Arc<crate::rsm::engine_snapshot::SnapshotQuoteProvider>,
491    balance_update_publisher: Option<crate::nats::NatsBalanceUpdatePublisher>,
492    balance_update_stream_required: bool,
493}
494
495impl RuntimeBalanceSnapshotProvider {
496    pub fn new(
497        snapshot_provider: std::sync::Arc<crate::rsm::engine_snapshot::SnapshotQuoteProvider>,
498        balance_update_publisher: Option<crate::nats::NatsBalanceUpdatePublisher>,
499        balance_update_stream_required: bool,
500    ) -> Self {
501        Self {
502            snapshot_provider,
503            balance_update_publisher,
504            balance_update_stream_required,
505        }
506    }
507}
508
509impl read_models::EngineBalanceSnapshotProvider for RuntimeBalanceSnapshotProvider {
510    fn balance_ledger_sync_snapshot(&self) -> read_models::BalanceLedgerSyncSnapshot {
511        let (balance_ledger, balance_update_seq) =
512            self.snapshot_provider.balance_ledger_sync_snapshot();
513        read_models::BalanceLedgerSyncSnapshot {
514            balances: balance_ledger.snapshot_map(),
515            balance_update_seq,
516            balance_update_stream_required: self.balance_update_stream_required,
517            latest_acked_balance_stream_sequence: self
518                .balance_update_publisher
519                .as_ref()
520                .map(|publisher| publisher.last_acked_stream_sequence()),
521            latest_acked_balance_update_seq: self
522                .balance_update_publisher
523                .as_ref()
524                .map(|publisher| publisher.last_acked_balance_update_seq()),
525        }
526    }
527}
528
529#[derive(Clone, Default)]
530pub struct EmptyBalanceSnapshotProvider;
531
532impl read_models::EngineBalanceSnapshotProvider for EmptyBalanceSnapshotProvider {
533    fn balance_ledger_sync_snapshot(&self) -> read_models::BalanceLedgerSyncSnapshot {
534        read_models::BalanceLedgerSyncSnapshot {
535            balances: HashMap::new(),
536            balance_update_seq: 0,
537            balance_update_stream_required: false,
538            latest_acked_balance_stream_sequence: None,
539            latest_acked_balance_update_seq: None,
540        }
541    }
542}
543
544impl From<crate::rsm::engine_snapshot::EngineStateDigest> for engine::EngineStateDigest {
545    fn from(value: crate::rsm::engine_snapshot::EngineStateDigest) -> Self {
546        Self {
547            l2_seq: value.l2_seq,
548            next_order_id: value.next_order_id,
549            next_trade_id: value.next_trade_id,
550            overall_digest: value.overall_digest,
551            orders_digest: value.orders_digest,
552            orders_count: value.orders_count,
553            positions_digest: value.positions_digest,
554            positions_count: value.positions_count,
555            cash_digest: value.cash_digest,
556            cash_wallet_count: value.cash_wallet_count,
557            markets_digest: value.markets_digest,
558            expired_instruments_count: value.expired_instruments_count,
559            trading_modes_count: value.trading_modes_count,
560            prices_digest: value.prices_digest,
561            spot_price_count: value.spot_price_count,
562            iv_surface_count: value.iv_surface_count,
563            iv_source_timestamps_digest: value.iv_source_timestamps_digest,
564            iv_source_timestamp_count: value.iv_source_timestamp_count,
565            perp_positions_digest: value.perp_positions_digest,
566            perp_positions_count: value.perp_positions_count,
567            hypercore_equity_digest: value.hypercore_equity_digest,
568            hypercore_equity_count: value.hypercore_equity_count,
569            mmp_digest: value.mmp_digest,
570            mmp_state_count: value.mmp_state_count,
571            mmp_enabled_digest: value.mmp_enabled_digest,
572            mmp_enabled_count: value.mmp_enabled_count,
573            liquidation_states_digest: value.liquidation_states_digest,
574            liquidation_state_count: value.liquidation_state_count,
575            wallet_margin_modes_digest: value.wallet_margin_modes_digest,
576            wallet_margin_mode_count: value.wallet_margin_mode_count,
577            wallet_trading_limits_digest: value.wallet_trading_limits_digest,
578            wallet_trading_limits_count: value.wallet_trading_limits_count,
579            wallet_tiers_digest: value.wallet_tiers_digest,
580            wallet_tier_count: value.wallet_tier_count,
581            deposit_watermarks_digest: value.deposit_watermarks_digest,
582            deposit_watermark_count: value.deposit_watermark_count,
583            agent_auth_digest: value.agent_auth_digest,
584            agent_auth_count: value.agent_auth_count,
585            nonce_sets_digest: value.nonce_sets_digest,
586            nonce_signer_count: value.nonce_signer_count,
587        }
588    }
589}
590
591#[derive(Clone)]
592pub struct RuntimeEngineStateDigestProvider {
593    provider: std::sync::Arc<dyn crate::rsm::engine_snapshot::EngineStateDigestProvider>,
594}
595
596impl RuntimeEngineStateDigestProvider {
597    pub fn new(
598        provider: std::sync::Arc<dyn crate::rsm::engine_snapshot::EngineStateDigestProvider>,
599    ) -> Self {
600        Self { provider }
601    }
602}
603
604impl engine::EngineStateDigestProvider for RuntimeEngineStateDigestProvider {
605    fn engine_state_digest(&self) -> engine::EngineStateDigest {
606        self.provider.engine_state_digest().into()
607    }
608}
609
610#[derive(Clone)]
611pub struct RuntimeEngineJournalReader {
612    writer: crate::journal::SharedEngineJournalWriter,
613}
614
615impl RuntimeEngineJournalReader {
616    pub fn new(writer: crate::journal::SharedEngineJournalWriter) -> Self {
617        Self { writer }
618    }
619}
620
621impl engine::EngineJournalReader for RuntimeEngineJournalReader {
622    fn get_recent(&self, limit: usize) -> anyhow::Result<Vec<hypercall_db::JournalCommandSummary>> {
623        self.writer.get_recent(limit).map_err(Into::into)
624    }
625
626    fn get_by_request_id(
627        &self,
628        request_id: &uuid::Uuid,
629    ) -> anyhow::Result<Option<hypercall_db::JournalFullRecord>> {
630        self.writer
631            .get_by_request_id(request_id)
632            .map_err(Into::into)
633    }
634}