Skip to main content

hypercall_api/websocket/
event_forwarder.rs

1use std::sync::Arc;
2use std::time::Instant;
3
4use metrics::{counter, histogram};
5use tokio::sync::mpsc;
6use tracing::{info, warn};
7
8use hypercall_types::{to_human_readable_decimal, WalletAddress};
9
10use super::{PubSubManager, WsFillUpdate, WsLiquidationStateChange, WsOptionsChainUpdate};
11use crate::boundary::market_inputs::GreeksCacheReader;
12use crate::boundary::market_inputs::InstrumentsCacheReader;
13use crate::boundary::read_models::PortfolioCacheApi;
14use crate::boundary::read_models::TierCacheApi;
15use crate::notification_service::NotificationService;
16use crate::options_chain;
17use crate::push_service::PushNotificationService;
18use hypercall_runtime_api::QuoteProvider;
19use hypercall_runtime_api::{EngineEventPersistence, Shutdown};
20use hypercall_types::ws_protocol::{WsOrderMessage, WsOrderRequest};
21use hypercall_types::{EngineMessage, OrderUpdateStatus};
22
23pub struct WsEventForwarderDeps {
24    pub pubsub: Arc<PubSubManager>,
25    pub portfolio_cache: Arc<dyn PortfolioCacheApi>,
26    pub instruments_cache: Arc<dyn InstrumentsCacheReader>,
27    pub quote_provider: Arc<dyn QuoteProvider>,
28    pub greeks_cache: Arc<dyn GreeksCacheReader>,
29    pub tier_cache: Arc<dyn TierCacheApi>,
30    pub push_service: Option<Arc<PushNotificationService>>,
31    pub db: Arc<dyn EngineEventPersistence>,
32    pub engine_event_tx: mpsc::UnboundedSender<EngineMessage>,
33    pub notification_service: Arc<NotificationService>,
34    pub shutdown: Shutdown,
35}
36
37pub async fn run(
38    deps: WsEventForwarderDeps,
39    mut ws_receiver: mpsc::UnboundedReceiver<EngineMessage>,
40) -> anyhow::Result<()> {
41    let mut shutdown_rx = deps.shutdown.subscribe();
42
43    loop {
44        let event = tokio::select! {
45            _ = shutdown_rx.recv() => {
46                info!("WsEventForwarder received shutdown signal");
47                break;
48            },
49            maybe_event = ws_receiver.recv() => {
50                match maybe_event {
51                    Some(e) => e,
52                    None => break,
53                }
54            }
55        };
56
57        let process_start = Instant::now();
58        let event_type = event_type_label(&event);
59
60        tracing::trace!(event_type, "WsEventForwarder received event");
61
62        match event {
63            EngineMessage::OrderAction(_)
64            | EngineMessage::MarketAction(_)
65            | EngineMessage::OrderInfo(_) => {
66                continue;
67            }
68            EngineMessage::OrderUpdate(update_msg) => {
69                if update_msg.status == OrderUpdateStatus::PartiallyFilled {
70                    continue;
71                }
72
73                let request = WsOrderRequest {
74                    price: update_msg.info.price,
75                    size: to_human_readable_decimal(&update_msg.info.symbol, update_msg.info.size),
76                    symbol: update_msg.info.symbol,
77                    side: update_msg.info.side,
78                    tif: update_msg.info.tif,
79                };
80
81                let msg = WsOrderMessage {
82                    order_id: update_msg.order_id,
83                    request,
84                    status: update_msg.status,
85                    timestamp: update_msg.timestamp,
86                    reason: update_msg.reason,
87                    wallet_address: update_msg.wallet_address,
88                    instrument_type: if update_msg.info.is_perp {
89                        "perp".to_string()
90                    } else {
91                        "option".to_string()
92                    },
93                };
94                deps.pubsub.publish_order_update(msg);
95            }
96            EngineMessage::MarketUpdate(update_msg) => {
97                handle_market_update(&deps, update_msg).await;
98            }
99            EngineMessage::OrderFilled { ref fill, .. } => {
100                if handle_order_filled(&deps, fill).await.is_err() {
101                    break;
102                }
103            }
104            EngineMessage::OrderbookUpdated(orderbook_update) => {
105                handle_orderbook_updated(&deps, &orderbook_update).await;
106            }
107            EngineMessage::L2Update(_) => {
108                continue;
109            }
110            EngineMessage::Trade(trade_msg) => {
111                use super::WsTradeUpdate;
112                let size_human = to_human_readable_decimal(&trade_msg.symbol, trade_msg.size);
113                let trade_update = WsTradeUpdate {
114                    symbol: trade_msg.symbol.clone(),
115                    price: trade_msg.price,
116                    size: size_human,
117                    side: format!("{:?}", trade_msg.side),
118                    timestamp: trade_msg.timestamp as i64,
119                };
120                deps.pubsub.publish_trade(trade_update);
121            }
122            EngineMessage::TransactionRequest(tx_req) => {
123                tracing::info!(
124                    "Received transaction request for processing: {}",
125                    tx_req.request_id
126                );
127            }
128            EngineMessage::TransactionUpdate(tx_update) => {
129                tracing::info!(
130                    "Transaction update: {} -> {:?}",
131                    tx_update.request_id,
132                    tx_update.status
133                );
134            }
135            EngineMessage::MmpTriggered(mmp_msg) => {
136                tracing::warn!(
137                    "MMP Triggered - wallet={}, currency={}, reason={}",
138                    mmp_msg.wallet,
139                    mmp_msg.currency,
140                    mmp_msg.reason
141                );
142            }
143            EngineMessage::PositionExpired(expiry_msg) => {
144                handle_position_expired(&deps, expiry_msg).await;
145            }
146            EngineMessage::TierUpdate(tier_update) => {
147                if let Ok(mode) = tier_update
148                    .margin_mode
149                    .parse::<hypercall_types::MarginMode>()
150                {
151                    deps.tier_cache
152                        .apply_margin_mode_update(tier_update.wallet, mode, tier_update.version)
153                        .await;
154                } else {
155                    tracing::warn!(
156                        "Invalid margin mode '{}' in tier update for wallet {}",
157                        tier_update.margin_mode,
158                        tier_update.wallet
159                    );
160                }
161            }
162            EngineMessage::HypercorePositionUpdate(update) => {
163                deps.portfolio_cache
164                    .handle_hypercore_position_update(update)
165                    .await;
166            }
167            EngineMessage::LiquidationStateChange(liq_msg) => {
168                if handle_liquidation_state_change(&deps, &liq_msg)
169                    .await
170                    .is_err()
171                {
172                    break;
173                }
174            }
175            EngineMessage::RfqFilled(msg) => {
176                tracing::info!(
177                    "RFQ filled: rfq_id={}, taker={}, qp={}, premium={}",
178                    msg.rfq_id,
179                    msg.taker_wallet,
180                    msg.qp_wallet,
181                    msg.net_premium
182                );
183                metrics::counter!("ht_rfq_fills_total").increment(1);
184            }
185        }
186
187        let process_ms = process_start.elapsed().as_secs_f64() * 1000.0;
188        histogram!("ht_ws_forwarder_process_ms", "event_type" => event_type.to_string())
189            .record(process_ms);
190        counter!("ht_ws_forwarder_events_processed", "event_type" => event_type.to_string())
191            .increment(1);
192
193        if process_ms > 50.0 {
194            tracing::warn!(
195                event_type = %event_type,
196                process_ms = %process_ms,
197                "Slow WsEventForwarder processing"
198            );
199        }
200    }
201    Ok(())
202}
203
204fn event_type_label(event: &EngineMessage) -> &'static str {
205    match event {
206        EngineMessage::OrderAction(_) => "OrderAction",
207        EngineMessage::OrderUpdate(_) => "OrderUpdate",
208        EngineMessage::OrderInfo(_) => "OrderInfo",
209        EngineMessage::MarketAction(_) => "MarketAction",
210        EngineMessage::MarketUpdate(_) => "MarketUpdate",
211        EngineMessage::OrderFilled { .. } => "OrderFilled",
212        EngineMessage::OrderbookUpdated(_) => "OrderbookUpdated",
213        EngineMessage::L2Update(_) => "L2Update",
214        EngineMessage::Trade(_) => "Trade",
215        EngineMessage::TransactionRequest(_) => "TransactionRequest",
216        EngineMessage::TransactionUpdate(_) => "TransactionUpdate",
217        EngineMessage::MmpTriggered(_) => "MmpTriggered",
218        EngineMessage::PositionExpired(_) => "PositionExpired",
219        EngineMessage::TierUpdate(_) => "TierUpdate",
220        EngineMessage::HypercorePositionUpdate(_) => "HypercorePositionUpdate",
221        EngineMessage::LiquidationStateChange(_) => "LiquidationStateChange",
222        EngineMessage::RfqFilled(_) => "RfqFilled",
223    }
224}
225
226async fn handle_market_update(
227    deps: &WsEventForwarderDeps,
228    update_msg: hypercall_types::MarketUpdateMessage,
229) {
230    use super::WsMarketUpdate;
231    use hypercall_types::{MarketUpdateStatus, OptionType};
232
233    let market = update_msg.market.clone();
234    let ws_update = match update_msg.status {
235        MarketUpdateStatus::MarketCreated | MarketUpdateStatus::MarketAlreadyExists => {
236            WsMarketUpdate::Created {
237                symbol: market.symbol.clone(),
238                strike: market.strike,
239                is_call: market.option_type == OptionType::Call,
240                underlying: market.underlying.clone(),
241                expiry: market.expiry as u32,
242                timestamp: update_msg.timestamp,
243            }
244        }
245        MarketUpdateStatus::MarketDeleted => WsMarketUpdate::Deleted {
246            symbol: market.symbol.clone(),
247            timestamp: update_msg.timestamp,
248        },
249        MarketUpdateStatus::MarketExpired | MarketUpdateStatus::MarketPendingSettlement => {
250            WsMarketUpdate::Expired {
251                symbol: market.symbol.clone(),
252                strike: market.strike,
253                is_call: market.option_type == OptionType::Call,
254                underlying: market.underlying.clone(),
255                expiry: market.expiry as u32,
256                timestamp: update_msg.timestamp,
257            }
258        }
259        MarketUpdateStatus::MarketCreationFailed | MarketUpdateStatus::MarketDeletionFailed => {
260            return;
261        }
262    };
263    deps.pubsub.publish_market_update(ws_update);
264
265    match update_msg.status {
266        MarketUpdateStatus::MarketCreated | MarketUpdateStatus::MarketAlreadyExists => {
267            let (symbol, currency, expiry, strike, option_type, leg) = if let Some(instrument) =
268                deps.instruments_cache.get_by_symbol(&market.symbol).await
269            {
270                let leg = options_chain::build_options_chain_leg(
271                    &instrument,
272                    &deps.quote_provider,
273                    &deps.greeks_cache,
274                    options_chain::OptionsChainSideFilter::Both,
275                )
276                .await;
277                (
278                    instrument.id,
279                    instrument.underlying.to_ascii_uppercase(),
280                    instrument.expiry,
281                    instrument.strike,
282                    instrument.option_type,
283                    leg,
284                )
285            } else {
286                tracing::warn!(
287                    symbol = %market.symbol,
288                    "Instrument cache miss on market create, falling back to market payload for options_chain upsert"
289                );
290                let leg = options_chain::build_options_chain_leg_for_symbol(
291                    &market.symbol,
292                    &market.underlying,
293                    None,
294                    &deps.quote_provider,
295                    &deps.greeks_cache,
296                    options_chain::OptionsChainSideFilter::Both,
297                )
298                .await;
299                let option_type = if market.option_type == OptionType::Call {
300                    "call".to_string()
301                } else {
302                    "put".to_string()
303                };
304                (
305                    market.symbol.clone(),
306                    market.underlying.to_ascii_uppercase(),
307                    market.expiry,
308                    market.strike,
309                    option_type,
310                    leg,
311                )
312            };
313
314            let strike = match options_chain::strike_to_f64(strike) {
315                Ok(value) => value,
316                Err(err) => {
317                    tracing::warn!(
318                        symbol = %symbol,
319                        error = %err,
320                        "Skipping options_chain create update due to invalid strike"
321                    );
322                    return;
323                }
324            };
325
326            let row = match options_chain::build_single_leg_strike_row(strike, &option_type, leg) {
327                Ok(value) => value,
328                Err(err) => {
329                    tracing::warn!(
330                        symbol = %symbol,
331                        option_type = %option_type,
332                        error = %err,
333                        "Skipping options_chain create update due to invalid option type"
334                    );
335                    return;
336                }
337            };
338
339            deps.pubsub
340                .publish_options_chain_update(WsOptionsChainUpdate::Upsert {
341                    currency,
342                    expiry,
343                    row,
344                    timestamp: update_msg.timestamp as i64,
345                });
346        }
347        MarketUpdateStatus::MarketDeleted
348        | MarketUpdateStatus::MarketExpired
349        | MarketUpdateStatus::MarketPendingSettlement => {
350            let strike = match options_chain::strike_to_f64(market.strike) {
351                Ok(value) => value,
352                Err(err) => {
353                    tracing::warn!(
354                        symbol = %market.symbol,
355                        error = %err,
356                        "Skipping options_chain remove update due to invalid strike"
357                    );
358                    return;
359                }
360            };
361            let option_type = if market.option_type == OptionType::Call {
362                "call".to_string()
363            } else {
364                "put".to_string()
365            };
366            deps.pubsub
367                .publish_options_chain_update(WsOptionsChainUpdate::Remove {
368                    currency: market.underlying.to_ascii_uppercase(),
369                    expiry: market.expiry,
370                    strike,
371                    option_type,
372                    symbol: market.symbol.clone(),
373                    timestamp: update_msg.timestamp as i64,
374                });
375        }
376        MarketUpdateStatus::MarketCreationFailed | MarketUpdateStatus::MarketDeletionFailed => {}
377    }
378}
379
380async fn handle_order_filled(
381    deps: &WsEventForwarderDeps,
382    fill: &hypercall_types::Fill,
383) -> Result<(), ()> {
384    let trade_id = fill.trade_id as i64;
385    let size_human = to_human_readable_decimal(&fill.symbol, fill.size);
386
387    let taker_fill = WsFillUpdate {
388        order_id: fill.taker_order_id as i64,
389        fill_id: trade_id,
390        symbol: fill.symbol.clone(),
391        side: format!("{:?}", fill.taker_side),
392        price: fill.price,
393        size: size_human,
394        timestamp: fill.timestamp as i64,
395        wallet_address: fill.taker_wallet_address,
396        fee: fill.fee,
397        trade_id,
398        is_taker: true,
399        builder_code_address: fill.builder_code_address,
400        builder_code_fee: fill.builder_code_fee,
401        instrument_type: "option".to_string(),
402    };
403    deps.pubsub.publish_fill(taker_fill);
404
405    let maker_fill = WsFillUpdate {
406        order_id: fill.maker_order_id as i64,
407        fill_id: trade_id,
408        symbol: fill.symbol.clone(),
409        side: match fill.taker_side {
410            hypercall_types::Side::Buy => "Sell".to_string(),
411            hypercall_types::Side::Sell => "Buy".to_string(),
412        },
413        price: fill.price,
414        size: size_human,
415        timestamp: fill.timestamp as i64,
416        wallet_address: fill.maker_wallet_address,
417        fee: fill.fee,
418        trade_id,
419        is_taker: false,
420        builder_code_address: None,
421        builder_code_fee: None,
422        instrument_type: "option".to_string(),
423    };
424    deps.pubsub.publish_fill(maker_fill);
425
426    deps.portfolio_cache
427        .publish_margin_update(&fill.taker_wallet_address)
428        .await;
429    deps.portfolio_cache
430        .publish_margin_update(&fill.maker_wallet_address)
431        .await;
432
433    publish_fill_notifications(deps, fill, size_human);
434
435    // Competition fill recording is no longer done here. It is an independent
436    // downstream consumer (hypercall_competition::CompetitionFillRecorder) wired
437    // to the engine fill stream at the root, so it is decoupled from websocket
438    // broadcast and a competition DB failure no longer crashes the engine node.
439    Ok(())
440}
441
442fn publish_fill_notifications(
443    deps: &WsEventForwarderDeps,
444    fill: &hypercall_types::Fill,
445    size_human: rust_decimal::Decimal,
446) {
447    let taker_side = format!("{:?}", fill.taker_side);
448    let taker_action = if taker_side == "Buy" {
449        "Bought"
450    } else {
451        "Sold"
452    };
453    let maker_action = if taker_side == "Buy" {
454        "Sold"
455    } else {
456        "Bought"
457    };
458
459    #[derive(serde::Serialize)]
460    struct FillPayload<'a> {
461        action: &'a str,
462        size: String,
463        symbol: &'a str,
464        price: String,
465        trade_id: u64,
466        #[serde(skip_serializing_if = "Option::is_none")]
467        realized_pnl: Option<String>,
468    }
469
470    let taker_wallet = fill.taker_wallet_address.to_string().to_lowercase();
471    let maker_wallet = fill.maker_wallet_address.to_string().to_lowercase();
472    let price_str = fill.price.to_string();
473    let size_str = size_human.to_string();
474    let symbol = fill.symbol.clone();
475    let trade_id = fill.trade_id;
476
477    if let Ok(bytes) = rmp_serde::to_vec_named(&FillPayload {
478        action: taker_action,
479        size: size_str.clone(),
480        symbol: &symbol,
481        price: price_str.clone(),
482        trade_id,
483        realized_pnl: fill.taker_realized_pnl.map(|p| p.to_string()),
484    }) {
485        let _ = deps
486            .notification_service
487            .enqueue_publish(&taker_wallet, "fill", bytes);
488    }
489    if let Ok(bytes) = rmp_serde::to_vec_named(&FillPayload {
490        action: maker_action,
491        size: size_str,
492        symbol: &symbol,
493        price: price_str,
494        trade_id,
495        realized_pnl: fill.maker_realized_pnl.map(|p| p.to_string()),
496    }) {
497        let _ = deps
498            .notification_service
499            .enqueue_publish(&maker_wallet, "fill", bytes);
500    }
501
502    if let Some(ref push_svc) = deps.push_service {
503        push_svc.send_fill_notification(
504            fill.taker_wallet_address.to_string().to_lowercase(),
505            taker_action,
506            size_human,
507            fill.symbol.clone(),
508            fill.price,
509            fill.trade_id,
510            fill.taker_realized_pnl,
511        );
512        push_svc.send_fill_notification(
513            fill.maker_wallet_address.to_string().to_lowercase(),
514            maker_action,
515            size_human,
516            fill.symbol.clone(),
517            fill.price,
518            fill.trade_id,
519            fill.maker_realized_pnl,
520        );
521    }
522}
523
524async fn handle_orderbook_updated(
525    deps: &WsEventForwarderDeps,
526    orderbook_update: &hypercall_types::OrderbookUpdate,
527) {
528    let symbol = orderbook_update.symbol.clone();
529    let timestamp_ms = orderbook_update.timestamp as i64;
530    let instrument = deps.instruments_cache.get_by_symbol(&symbol).await;
531    let option_token_address = instrument
532        .as_ref()
533        .and_then(|value| value.option_token_address);
534
535    let update = ws_orderbook_update_from_engine_update(orderbook_update, option_token_address);
536    deps.pubsub.publish_orderbook_update(update);
537
538    if let Some(instrument) = instrument {
539        let leg = options_chain::build_options_chain_leg(
540            &instrument,
541            &deps.quote_provider,
542            &deps.greeks_cache,
543            options_chain::OptionsChainSideFilter::Both,
544        )
545        .await;
546
547        let strike = match options_chain::strike_to_f64(instrument.strike) {
548            Ok(value) => value,
549            Err(err) => {
550                tracing::warn!(
551                    symbol = %instrument.id,
552                    error = %err,
553                    "Skipping options_chain orderbook update due to invalid strike"
554                );
555                return;
556            }
557        };
558
559        let row = match options_chain::build_single_leg_strike_row(
560            strike,
561            &instrument.option_type,
562            leg,
563        ) {
564            Ok(value) => value,
565            Err(err) => {
566                tracing::warn!(
567                    symbol = %instrument.id,
568                    option_type = %instrument.option_type,
569                    error = %err,
570                    "Skipping options_chain orderbook update due to invalid option type"
571                );
572                return;
573            }
574        };
575
576        deps.pubsub
577            .publish_options_chain_update(WsOptionsChainUpdate::Upsert {
578                currency: instrument.underlying.to_ascii_uppercase(),
579                expiry: instrument.expiry,
580                row,
581                timestamp: timestamp_ms,
582            });
583    }
584}
585
586async fn handle_position_expired(
587    deps: &WsEventForwarderDeps,
588    expiry_msg: hypercall_types::PositionExpiredMessage,
589) {
590    use super::WsPositionExpired;
591
592    if deps
593        .portfolio_cache
594        .has_live_position_symbol(&expiry_msg.wallet_address, &expiry_msg.symbol)
595        .await
596    {
597        deps.portfolio_cache
598            .handle_engine_message(
599                EngineMessage::PositionExpired(expiry_msg.clone()),
600                expiry_msg.timestamp as i64,
601            )
602            .await;
603    }
604
605    let ws_expiry = WsPositionExpired {
606        wallet_address: expiry_msg.wallet_address,
607        symbol: expiry_msg.symbol.clone(),
608        position_size: expiry_msg.position_size,
609        settlement_price: expiry_msg.settlement_price,
610        settlement_value: expiry_msg.settlement_value,
611        settlement_entry_price: expiry_msg.settlement_entry_price,
612        cost_basis: expiry_msg.cost_basis,
613        net_pnl: expiry_msg.net_pnl,
614        timestamp: expiry_msg.timestamp as i64,
615    };
616    deps.pubsub.publish_position_expired(ws_expiry);
617}
618
619async fn handle_liquidation_state_change(
620    deps: &WsEventForwarderDeps,
621    liq_msg: &hypercall_types::LiquidationStateMessage,
622) -> Result<(), ()> {
623    tracing::info!(
624        "Liquidation state change: wallet={}, {} -> {}",
625        liq_msg.wallet,
626        liq_msg.previous_state,
627        liq_msg.new_state
628    );
629
630    let should_publish_transition =
631        liq_msg.previous_state != liq_msg.new_state || liq_msg.projection_changed;
632    let persistence_event = EngineMessage::LiquidationStateChange(liq_msg.clone());
633    if let Err(error) = deps.db.handle_event(&persistence_event).await {
634        panic!(
635            "STATE_CORRUPTION: failed to persist liquidation state change for wallet {}: {}",
636            liq_msg.wallet, error
637        );
638    }
639
640    if should_publish_transition {
641        let ws_liq = ws_liquidation_state_change_from_engine(liq_msg);
642        deps.pubsub.publish_liquidation_state_change(ws_liq);
643
644        if let Some(ref push_svc) = deps.push_service {
645            let title = match liq_msg.new_state {
646                hypercall_types::LiquidationStateType::Healthy => "Account Healthy",
647                hypercall_types::LiquidationStateType::PreLiquidation => "Liquidation Warning",
648                hypercall_types::LiquidationStateType::InLiquidation => "Liquidation In Progress",
649                hypercall_types::LiquidationStateType::Liquidated => "Account Liquidated",
650            };
651            push_svc.send_liquidation_notification(
652                liq_msg.wallet.to_string().to_lowercase(),
653                liq_msg.previous_state,
654                liq_msg.new_state,
655                title,
656            );
657        }
658    }
659
660    if let Err(error) = deps.engine_event_tx.send(persistence_event) {
661        warn!(
662            "Failed to forward liquidation state change onto engine event bus: {}",
663            error
664        );
665    }
666
667    Ok(())
668}
669
670fn checked_liquidation_timestamp(field: &str, value: u64) -> i64 {
671    i64::try_from(value).unwrap_or_else(|_| {
672        panic!(
673            "STATE_CORRUPTION: liquidation {} {} exceeds i64 range",
674            field, value
675        )
676    })
677}
678
679fn partial_liquidation_state(
680    status: &hypercall_types::liquidation_state::AccountLiquidationStatus,
681) -> Option<hypercall_types::WsPartialLiquidationState> {
682    let hypercall_types::liquidation_state::LiquidationState::PreLiquidation(metadata) =
683        &status.state
684    else {
685        return None;
686    };
687
688    Some(hypercall_types::WsPartialLiquidationState {
689        entered_at: checked_liquidation_timestamp("entered_at", metadata.entered_at),
690        target_equity: metadata.target_equity,
691        mm_shortfall: metadata.mm_shortfall,
692        escalation_deadline: checked_liquidation_timestamp(
693            "escalation_deadline",
694            metadata.escalation_deadline,
695        ),
696        last_reprice_at: metadata
697            .last_reprice_at
698            .map(|value| checked_liquidation_timestamp("last_reprice_at", value)),
699        active_order_request_ids: metadata.active_order_request_ids.clone(),
700        active_order_client_ids: metadata.active_order_client_ids.clone(),
701        bonus_bps: metadata.bonus_bps as i32,
702        pending_full_auction_id: metadata.pending_full_auction_id.clone(),
703        pending_full_request_id: metadata.pending_full_request_id.clone(),
704        pending_full_tx_hash: metadata.pending_full_tx_hash.clone(),
705        pending_full_margin_needed: metadata.pending_full_margin_needed,
706    })
707}
708
709fn full_liquidation_state(
710    status: &hypercall_types::liquidation_state::AccountLiquidationStatus,
711) -> Option<hypercall_types::WsFullLiquidationState> {
712    match &status.state {
713        hypercall_types::liquidation_state::LiquidationState::InLiquidation(metadata) => {
714            Some(hypercall_types::WsFullLiquidationState {
715                auction_id: Some(metadata.auction_id.clone()),
716                request_id: metadata.request_id.clone(),
717                tx_hash: metadata.tx_hash.clone(),
718                started_at: Some(checked_liquidation_timestamp(
719                    "started_at",
720                    metadata.started_at,
721                )),
722                chain_start_time: metadata
723                    .chain_start_time
724                    .map(|value| checked_liquidation_timestamp("chain_start_time", value)),
725                margin_needed: Some(metadata.margin_needed),
726                stop_request_id: metadata.stop_request_id.clone(),
727                stop_tx_hash: metadata.stop_tx_hash.clone(),
728                liquidated_at: None,
729                winner: None,
730                bonus: None,
731                resolution_tx_hash: None,
732            })
733        }
734        hypercall_types::liquidation_state::LiquidationState::Liquidated(metadata) => {
735            Some(hypercall_types::WsFullLiquidationState {
736                auction_id: Some(metadata.auction_id.clone()),
737                request_id: None,
738                tx_hash: None,
739                started_at: None,
740                chain_start_time: None,
741                margin_needed: None,
742                stop_request_id: None,
743                stop_tx_hash: None,
744                liquidated_at: Some(checked_liquidation_timestamp(
745                    "completed_at",
746                    metadata.completed_at,
747                )),
748                winner: metadata.winner.map(|winner| winner.to_string()),
749                bonus: Some(metadata.bonus),
750                resolution_tx_hash: metadata.tx_hash.clone(),
751            })
752        }
753        hypercall_types::liquidation_state::LiquidationState::Healthy
754        | hypercall_types::liquidation_state::LiquidationState::PreLiquidation(..) => None,
755    }
756}
757
758fn ws_liquidation_state_change_from_engine(
759    message: &hypercall_types::LiquidationStateMessage,
760) -> WsLiquidationStateChange {
761    WsLiquidationStateChange {
762        wallet_address: message.wallet,
763        previous_state: message.previous_state.as_str().to_string(),
764        new_state: message.new_state.as_str().to_string(),
765        liquidation_mode: message.liquidation_mode.clone(),
766        margin_mode: message.margin_mode.clone(),
767        equity: message.equity,
768        mm_required: message.mm_required,
769        maintenance_margin: message.maintenance_margin,
770        shortfall: message.shortfall,
771        partial_liquidation: partial_liquidation_state(&message.status),
772        full_liquidation: full_liquidation_state(&message.status),
773        timestamp: checked_liquidation_timestamp("event timestamp", message.timestamp),
774    }
775}
776
777pub fn ws_orderbook_update_from_engine_update(
778    orderbook_update_contract_units_raw: &hypercall_types::OrderbookUpdate,
779    option_token_address: Option<WalletAddress>,
780) -> super::WsOrderbookUpdate {
781    let bids_human_contracts = orderbook_update_contract_units_raw
782        .bids
783        .iter()
784        .map(|(price, size_contract_units_raw)| {
785            (
786                *price,
787                to_human_readable_decimal(
788                    &orderbook_update_contract_units_raw.symbol,
789                    *size_contract_units_raw,
790                ),
791            )
792        })
793        .collect();
794
795    let asks_human_contracts = orderbook_update_contract_units_raw
796        .asks
797        .iter()
798        .map(|(price, size_contract_units_raw)| {
799            (
800                *price,
801                to_human_readable_decimal(
802                    &orderbook_update_contract_units_raw.symbol,
803                    *size_contract_units_raw,
804                ),
805            )
806        })
807        .collect();
808
809    super::WsOrderbookUpdate {
810        symbol: orderbook_update_contract_units_raw.symbol.clone(),
811        option_token_address,
812        bids: bids_human_contracts,
813        asks: asks_human_contracts,
814        timestamp: orderbook_update_contract_units_raw.timestamp as i64,
815    }
816}