1use std::sync::Arc;
2use std::time::Instant;
3
4use metrics::{counter, histogram};
5use tokio::sync::mpsc;
6use tracing::{info, warn};
7
8use hypercall_types::{to_human_readable_decimal, WalletAddress};
9
10use super::{PubSubManager, WsFillUpdate, WsLiquidationStateChange, WsOptionsChainUpdate};
11use crate::boundary::market_inputs::GreeksCacheReader;
12use crate::boundary::market_inputs::InstrumentsCacheReader;
13use crate::boundary::read_models::PortfolioCacheApi;
14use crate::boundary::read_models::TierCacheApi;
15use crate::notification_service::NotificationService;
16use crate::options_chain;
17use crate::push_service::PushNotificationService;
18use hypercall_runtime_api::QuoteProvider;
19use hypercall_runtime_api::{EngineEventPersistence, Shutdown};
20use hypercall_types::ws_protocol::{WsOrderMessage, WsOrderRequest};
21use hypercall_types::{EngineMessage, OrderUpdateStatus};
22
23pub struct WsEventForwarderDeps {
24 pub pubsub: Arc<PubSubManager>,
25 pub portfolio_cache: Arc<dyn PortfolioCacheApi>,
26 pub instruments_cache: Arc<dyn InstrumentsCacheReader>,
27 pub quote_provider: Arc<dyn QuoteProvider>,
28 pub greeks_cache: Arc<dyn GreeksCacheReader>,
29 pub tier_cache: Arc<dyn TierCacheApi>,
30 pub push_service: Option<Arc<PushNotificationService>>,
31 pub db: Arc<dyn EngineEventPersistence>,
32 pub engine_event_tx: mpsc::UnboundedSender<EngineMessage>,
33 pub notification_service: Arc<NotificationService>,
34 pub shutdown: Shutdown,
35}
36
37pub async fn run(
38 deps: WsEventForwarderDeps,
39 mut ws_receiver: mpsc::UnboundedReceiver<EngineMessage>,
40) -> anyhow::Result<()> {
41 let mut shutdown_rx = deps.shutdown.subscribe();
42
43 loop {
44 let event = tokio::select! {
45 _ = shutdown_rx.recv() => {
46 info!("WsEventForwarder received shutdown signal");
47 break;
48 },
49 maybe_event = ws_receiver.recv() => {
50 match maybe_event {
51 Some(e) => e,
52 None => break,
53 }
54 }
55 };
56
57 let process_start = Instant::now();
58 let event_type = event_type_label(&event);
59
60 tracing::trace!(event_type, "WsEventForwarder received event");
61
62 match event {
63 EngineMessage::OrderAction(_)
64 | EngineMessage::MarketAction(_)
65 | EngineMessage::OrderInfo(_) => {
66 continue;
67 }
68 EngineMessage::OrderUpdate(update_msg) => {
69 if update_msg.status == OrderUpdateStatus::PartiallyFilled {
70 continue;
71 }
72
73 let request = WsOrderRequest {
74 price: update_msg.info.price,
75 size: to_human_readable_decimal(&update_msg.info.symbol, update_msg.info.size),
76 symbol: update_msg.info.symbol,
77 side: update_msg.info.side,
78 tif: update_msg.info.tif,
79 };
80
81 let msg = WsOrderMessage {
82 order_id: update_msg.order_id,
83 request,
84 status: update_msg.status,
85 timestamp: update_msg.timestamp,
86 reason: update_msg.reason,
87 wallet_address: update_msg.wallet_address,
88 instrument_type: if update_msg.info.is_perp {
89 "perp".to_string()
90 } else {
91 "option".to_string()
92 },
93 };
94 deps.pubsub.publish_order_update(msg);
95 }
96 EngineMessage::MarketUpdate(update_msg) => {
97 handle_market_update(&deps, update_msg).await;
98 }
99 EngineMessage::OrderFilled { ref fill, .. } => {
100 if handle_order_filled(&deps, fill).await.is_err() {
101 break;
102 }
103 }
104 EngineMessage::OrderbookUpdated(orderbook_update) => {
105 handle_orderbook_updated(&deps, &orderbook_update).await;
106 }
107 EngineMessage::L2Update(_) => {
108 continue;
109 }
110 EngineMessage::Trade(trade_msg) => {
111 use super::WsTradeUpdate;
112 let size_human = to_human_readable_decimal(&trade_msg.symbol, trade_msg.size);
113 let trade_update = WsTradeUpdate {
114 symbol: trade_msg.symbol.clone(),
115 price: trade_msg.price,
116 size: size_human,
117 side: format!("{:?}", trade_msg.side),
118 timestamp: trade_msg.timestamp as i64,
119 };
120 deps.pubsub.publish_trade(trade_update);
121 }
122 EngineMessage::TransactionRequest(tx_req) => {
123 tracing::info!(
124 "Received transaction request for processing: {}",
125 tx_req.request_id
126 );
127 }
128 EngineMessage::TransactionUpdate(tx_update) => {
129 tracing::info!(
130 "Transaction update: {} -> {:?}",
131 tx_update.request_id,
132 tx_update.status
133 );
134 }
135 EngineMessage::MmpTriggered(mmp_msg) => {
136 tracing::warn!(
137 "MMP Triggered - wallet={}, currency={}, reason={}",
138 mmp_msg.wallet,
139 mmp_msg.currency,
140 mmp_msg.reason
141 );
142 }
143 EngineMessage::PositionExpired(expiry_msg) => {
144 handle_position_expired(&deps, expiry_msg).await;
145 }
146 EngineMessage::TierUpdate(tier_update) => {
147 if let Ok(mode) = tier_update
148 .margin_mode
149 .parse::<hypercall_types::MarginMode>()
150 {
151 deps.tier_cache
152 .apply_margin_mode_update(tier_update.wallet, mode, tier_update.version)
153 .await;
154 } else {
155 tracing::warn!(
156 "Invalid margin mode '{}' in tier update for wallet {}",
157 tier_update.margin_mode,
158 tier_update.wallet
159 );
160 }
161 }
162 EngineMessage::HypercorePositionUpdate(update) => {
163 deps.portfolio_cache
164 .handle_hypercore_position_update(update)
165 .await;
166 }
167 EngineMessage::LiquidationStateChange(liq_msg) => {
168 if handle_liquidation_state_change(&deps, &liq_msg)
169 .await
170 .is_err()
171 {
172 break;
173 }
174 }
175 EngineMessage::RfqFilled(msg) => {
176 tracing::info!(
177 "RFQ filled: rfq_id={}, taker={}, qp={}, premium={}",
178 msg.rfq_id,
179 msg.taker_wallet,
180 msg.qp_wallet,
181 msg.net_premium
182 );
183 metrics::counter!("ht_rfq_fills_total").increment(1);
184 }
185 }
186
187 let process_ms = process_start.elapsed().as_secs_f64() * 1000.0;
188 histogram!("ht_ws_forwarder_process_ms", "event_type" => event_type.to_string())
189 .record(process_ms);
190 counter!("ht_ws_forwarder_events_processed", "event_type" => event_type.to_string())
191 .increment(1);
192
193 if process_ms > 50.0 {
194 tracing::warn!(
195 event_type = %event_type,
196 process_ms = %process_ms,
197 "Slow WsEventForwarder processing"
198 );
199 }
200 }
201 Ok(())
202}
203
204fn event_type_label(event: &EngineMessage) -> &'static str {
205 match event {
206 EngineMessage::OrderAction(_) => "OrderAction",
207 EngineMessage::OrderUpdate(_) => "OrderUpdate",
208 EngineMessage::OrderInfo(_) => "OrderInfo",
209 EngineMessage::MarketAction(_) => "MarketAction",
210 EngineMessage::MarketUpdate(_) => "MarketUpdate",
211 EngineMessage::OrderFilled { .. } => "OrderFilled",
212 EngineMessage::OrderbookUpdated(_) => "OrderbookUpdated",
213 EngineMessage::L2Update(_) => "L2Update",
214 EngineMessage::Trade(_) => "Trade",
215 EngineMessage::TransactionRequest(_) => "TransactionRequest",
216 EngineMessage::TransactionUpdate(_) => "TransactionUpdate",
217 EngineMessage::MmpTriggered(_) => "MmpTriggered",
218 EngineMessage::PositionExpired(_) => "PositionExpired",
219 EngineMessage::TierUpdate(_) => "TierUpdate",
220 EngineMessage::HypercorePositionUpdate(_) => "HypercorePositionUpdate",
221 EngineMessage::LiquidationStateChange(_) => "LiquidationStateChange",
222 EngineMessage::RfqFilled(_) => "RfqFilled",
223 }
224}
225
226async fn handle_market_update(
227 deps: &WsEventForwarderDeps,
228 update_msg: hypercall_types::MarketUpdateMessage,
229) {
230 use super::WsMarketUpdate;
231 use hypercall_types::{MarketUpdateStatus, OptionType};
232
233 let market = update_msg.market.clone();
234 let ws_update = match update_msg.status {
235 MarketUpdateStatus::MarketCreated | MarketUpdateStatus::MarketAlreadyExists => {
236 WsMarketUpdate::Created {
237 symbol: market.symbol.clone(),
238 strike: market.strike,
239 is_call: market.option_type == OptionType::Call,
240 underlying: market.underlying.clone(),
241 expiry: market.expiry as u32,
242 timestamp: update_msg.timestamp,
243 }
244 }
245 MarketUpdateStatus::MarketDeleted => WsMarketUpdate::Deleted {
246 symbol: market.symbol.clone(),
247 timestamp: update_msg.timestamp,
248 },
249 MarketUpdateStatus::MarketExpired | MarketUpdateStatus::MarketPendingSettlement => {
250 WsMarketUpdate::Expired {
251 symbol: market.symbol.clone(),
252 strike: market.strike,
253 is_call: market.option_type == OptionType::Call,
254 underlying: market.underlying.clone(),
255 expiry: market.expiry as u32,
256 timestamp: update_msg.timestamp,
257 }
258 }
259 MarketUpdateStatus::MarketCreationFailed | MarketUpdateStatus::MarketDeletionFailed => {
260 return;
261 }
262 };
263 deps.pubsub.publish_market_update(ws_update);
264
265 match update_msg.status {
266 MarketUpdateStatus::MarketCreated | MarketUpdateStatus::MarketAlreadyExists => {
267 let (symbol, currency, expiry, strike, option_type, leg) = if let Some(instrument) =
268 deps.instruments_cache.get_by_symbol(&market.symbol).await
269 {
270 let leg = options_chain::build_options_chain_leg(
271 &instrument,
272 &deps.quote_provider,
273 &deps.greeks_cache,
274 options_chain::OptionsChainSideFilter::Both,
275 )
276 .await;
277 (
278 instrument.id,
279 instrument.underlying.to_ascii_uppercase(),
280 instrument.expiry,
281 instrument.strike,
282 instrument.option_type,
283 leg,
284 )
285 } else {
286 tracing::warn!(
287 symbol = %market.symbol,
288 "Instrument cache miss on market create, falling back to market payload for options_chain upsert"
289 );
290 let leg = options_chain::build_options_chain_leg_for_symbol(
291 &market.symbol,
292 &market.underlying,
293 None,
294 &deps.quote_provider,
295 &deps.greeks_cache,
296 options_chain::OptionsChainSideFilter::Both,
297 )
298 .await;
299 let option_type = if market.option_type == OptionType::Call {
300 "call".to_string()
301 } else {
302 "put".to_string()
303 };
304 (
305 market.symbol.clone(),
306 market.underlying.to_ascii_uppercase(),
307 market.expiry,
308 market.strike,
309 option_type,
310 leg,
311 )
312 };
313
314 let strike = match options_chain::strike_to_f64(strike) {
315 Ok(value) => value,
316 Err(err) => {
317 tracing::warn!(
318 symbol = %symbol,
319 error = %err,
320 "Skipping options_chain create update due to invalid strike"
321 );
322 return;
323 }
324 };
325
326 let row = match options_chain::build_single_leg_strike_row(strike, &option_type, leg) {
327 Ok(value) => value,
328 Err(err) => {
329 tracing::warn!(
330 symbol = %symbol,
331 option_type = %option_type,
332 error = %err,
333 "Skipping options_chain create update due to invalid option type"
334 );
335 return;
336 }
337 };
338
339 deps.pubsub
340 .publish_options_chain_update(WsOptionsChainUpdate::Upsert {
341 currency,
342 expiry,
343 row,
344 timestamp: update_msg.timestamp as i64,
345 });
346 }
347 MarketUpdateStatus::MarketDeleted
348 | MarketUpdateStatus::MarketExpired
349 | MarketUpdateStatus::MarketPendingSettlement => {
350 let strike = match options_chain::strike_to_f64(market.strike) {
351 Ok(value) => value,
352 Err(err) => {
353 tracing::warn!(
354 symbol = %market.symbol,
355 error = %err,
356 "Skipping options_chain remove update due to invalid strike"
357 );
358 return;
359 }
360 };
361 let option_type = if market.option_type == OptionType::Call {
362 "call".to_string()
363 } else {
364 "put".to_string()
365 };
366 deps.pubsub
367 .publish_options_chain_update(WsOptionsChainUpdate::Remove {
368 currency: market.underlying.to_ascii_uppercase(),
369 expiry: market.expiry,
370 strike,
371 option_type,
372 symbol: market.symbol.clone(),
373 timestamp: update_msg.timestamp as i64,
374 });
375 }
376 MarketUpdateStatus::MarketCreationFailed | MarketUpdateStatus::MarketDeletionFailed => {}
377 }
378}
379
380async fn handle_order_filled(
381 deps: &WsEventForwarderDeps,
382 fill: &hypercall_types::Fill,
383) -> Result<(), ()> {
384 let trade_id = fill.trade_id as i64;
385 let size_human = to_human_readable_decimal(&fill.symbol, fill.size);
386
387 let taker_fill = WsFillUpdate {
388 order_id: fill.taker_order_id as i64,
389 fill_id: trade_id,
390 symbol: fill.symbol.clone(),
391 side: format!("{:?}", fill.taker_side),
392 price: fill.price,
393 size: size_human,
394 timestamp: fill.timestamp as i64,
395 wallet_address: fill.taker_wallet_address,
396 fee: fill.fee,
397 trade_id,
398 is_taker: true,
399 builder_code_address: fill.builder_code_address,
400 builder_code_fee: fill.builder_code_fee,
401 instrument_type: "option".to_string(),
402 };
403 deps.pubsub.publish_fill(taker_fill);
404
405 let maker_fill = WsFillUpdate {
406 order_id: fill.maker_order_id as i64,
407 fill_id: trade_id,
408 symbol: fill.symbol.clone(),
409 side: match fill.taker_side {
410 hypercall_types::Side::Buy => "Sell".to_string(),
411 hypercall_types::Side::Sell => "Buy".to_string(),
412 },
413 price: fill.price,
414 size: size_human,
415 timestamp: fill.timestamp as i64,
416 wallet_address: fill.maker_wallet_address,
417 fee: fill.fee,
418 trade_id,
419 is_taker: false,
420 builder_code_address: None,
421 builder_code_fee: None,
422 instrument_type: "option".to_string(),
423 };
424 deps.pubsub.publish_fill(maker_fill);
425
426 deps.portfolio_cache
427 .publish_margin_update(&fill.taker_wallet_address)
428 .await;
429 deps.portfolio_cache
430 .publish_margin_update(&fill.maker_wallet_address)
431 .await;
432
433 publish_fill_notifications(deps, fill, size_human);
434
435 Ok(())
440}
441
442fn publish_fill_notifications(
443 deps: &WsEventForwarderDeps,
444 fill: &hypercall_types::Fill,
445 size_human: rust_decimal::Decimal,
446) {
447 let taker_side = format!("{:?}", fill.taker_side);
448 let taker_action = if taker_side == "Buy" {
449 "Bought"
450 } else {
451 "Sold"
452 };
453 let maker_action = if taker_side == "Buy" {
454 "Sold"
455 } else {
456 "Bought"
457 };
458
459 #[derive(serde::Serialize)]
460 struct FillPayload<'a> {
461 action: &'a str,
462 size: String,
463 symbol: &'a str,
464 price: String,
465 trade_id: u64,
466 #[serde(skip_serializing_if = "Option::is_none")]
467 realized_pnl: Option<String>,
468 }
469
470 let taker_wallet = fill.taker_wallet_address.to_string().to_lowercase();
471 let maker_wallet = fill.maker_wallet_address.to_string().to_lowercase();
472 let price_str = fill.price.to_string();
473 let size_str = size_human.to_string();
474 let symbol = fill.symbol.clone();
475 let trade_id = fill.trade_id;
476
477 if let Ok(bytes) = rmp_serde::to_vec_named(&FillPayload {
478 action: taker_action,
479 size: size_str.clone(),
480 symbol: &symbol,
481 price: price_str.clone(),
482 trade_id,
483 realized_pnl: fill.taker_realized_pnl.map(|p| p.to_string()),
484 }) {
485 let _ = deps
486 .notification_service
487 .enqueue_publish(&taker_wallet, "fill", bytes);
488 }
489 if let Ok(bytes) = rmp_serde::to_vec_named(&FillPayload {
490 action: maker_action,
491 size: size_str,
492 symbol: &symbol,
493 price: price_str,
494 trade_id,
495 realized_pnl: fill.maker_realized_pnl.map(|p| p.to_string()),
496 }) {
497 let _ = deps
498 .notification_service
499 .enqueue_publish(&maker_wallet, "fill", bytes);
500 }
501
502 if let Some(ref push_svc) = deps.push_service {
503 push_svc.send_fill_notification(
504 fill.taker_wallet_address.to_string().to_lowercase(),
505 taker_action,
506 size_human,
507 fill.symbol.clone(),
508 fill.price,
509 fill.trade_id,
510 fill.taker_realized_pnl,
511 );
512 push_svc.send_fill_notification(
513 fill.maker_wallet_address.to_string().to_lowercase(),
514 maker_action,
515 size_human,
516 fill.symbol.clone(),
517 fill.price,
518 fill.trade_id,
519 fill.maker_realized_pnl,
520 );
521 }
522}
523
524async fn handle_orderbook_updated(
525 deps: &WsEventForwarderDeps,
526 orderbook_update: &hypercall_types::OrderbookUpdate,
527) {
528 let symbol = orderbook_update.symbol.clone();
529 let timestamp_ms = orderbook_update.timestamp as i64;
530 let instrument = deps.instruments_cache.get_by_symbol(&symbol).await;
531 let option_token_address = instrument
532 .as_ref()
533 .and_then(|value| value.option_token_address);
534
535 let update = ws_orderbook_update_from_engine_update(orderbook_update, option_token_address);
536 deps.pubsub.publish_orderbook_update(update);
537
538 if let Some(instrument) = instrument {
539 let leg = options_chain::build_options_chain_leg(
540 &instrument,
541 &deps.quote_provider,
542 &deps.greeks_cache,
543 options_chain::OptionsChainSideFilter::Both,
544 )
545 .await;
546
547 let strike = match options_chain::strike_to_f64(instrument.strike) {
548 Ok(value) => value,
549 Err(err) => {
550 tracing::warn!(
551 symbol = %instrument.id,
552 error = %err,
553 "Skipping options_chain orderbook update due to invalid strike"
554 );
555 return;
556 }
557 };
558
559 let row = match options_chain::build_single_leg_strike_row(
560 strike,
561 &instrument.option_type,
562 leg,
563 ) {
564 Ok(value) => value,
565 Err(err) => {
566 tracing::warn!(
567 symbol = %instrument.id,
568 option_type = %instrument.option_type,
569 error = %err,
570 "Skipping options_chain orderbook update due to invalid option type"
571 );
572 return;
573 }
574 };
575
576 deps.pubsub
577 .publish_options_chain_update(WsOptionsChainUpdate::Upsert {
578 currency: instrument.underlying.to_ascii_uppercase(),
579 expiry: instrument.expiry,
580 row,
581 timestamp: timestamp_ms,
582 });
583 }
584}
585
586async fn handle_position_expired(
587 deps: &WsEventForwarderDeps,
588 expiry_msg: hypercall_types::PositionExpiredMessage,
589) {
590 use super::WsPositionExpired;
591
592 if deps
593 .portfolio_cache
594 .has_live_position_symbol(&expiry_msg.wallet_address, &expiry_msg.symbol)
595 .await
596 {
597 deps.portfolio_cache
598 .handle_engine_message(
599 EngineMessage::PositionExpired(expiry_msg.clone()),
600 expiry_msg.timestamp as i64,
601 )
602 .await;
603 }
604
605 let ws_expiry = WsPositionExpired {
606 wallet_address: expiry_msg.wallet_address,
607 symbol: expiry_msg.symbol.clone(),
608 position_size: expiry_msg.position_size,
609 settlement_price: expiry_msg.settlement_price,
610 settlement_value: expiry_msg.settlement_value,
611 settlement_entry_price: expiry_msg.settlement_entry_price,
612 cost_basis: expiry_msg.cost_basis,
613 net_pnl: expiry_msg.net_pnl,
614 timestamp: expiry_msg.timestamp as i64,
615 };
616 deps.pubsub.publish_position_expired(ws_expiry);
617}
618
619async fn handle_liquidation_state_change(
620 deps: &WsEventForwarderDeps,
621 liq_msg: &hypercall_types::LiquidationStateMessage,
622) -> Result<(), ()> {
623 tracing::info!(
624 "Liquidation state change: wallet={}, {} -> {}",
625 liq_msg.wallet,
626 liq_msg.previous_state,
627 liq_msg.new_state
628 );
629
630 let should_publish_transition =
631 liq_msg.previous_state != liq_msg.new_state || liq_msg.projection_changed;
632 let persistence_event = EngineMessage::LiquidationStateChange(liq_msg.clone());
633 if let Err(error) = deps.db.handle_event(&persistence_event).await {
634 panic!(
635 "STATE_CORRUPTION: failed to persist liquidation state change for wallet {}: {}",
636 liq_msg.wallet, error
637 );
638 }
639
640 if should_publish_transition {
641 let ws_liq = ws_liquidation_state_change_from_engine(liq_msg);
642 deps.pubsub.publish_liquidation_state_change(ws_liq);
643
644 if let Some(ref push_svc) = deps.push_service {
645 let title = match liq_msg.new_state {
646 hypercall_types::LiquidationStateType::Healthy => "Account Healthy",
647 hypercall_types::LiquidationStateType::PreLiquidation => "Liquidation Warning",
648 hypercall_types::LiquidationStateType::InLiquidation => "Liquidation In Progress",
649 hypercall_types::LiquidationStateType::Liquidated => "Account Liquidated",
650 };
651 push_svc.send_liquidation_notification(
652 liq_msg.wallet.to_string().to_lowercase(),
653 liq_msg.previous_state,
654 liq_msg.new_state,
655 title,
656 );
657 }
658 }
659
660 if let Err(error) = deps.engine_event_tx.send(persistence_event) {
661 warn!(
662 "Failed to forward liquidation state change onto engine event bus: {}",
663 error
664 );
665 }
666
667 Ok(())
668}
669
670fn checked_liquidation_timestamp(field: &str, value: u64) -> i64 {
671 i64::try_from(value).unwrap_or_else(|_| {
672 panic!(
673 "STATE_CORRUPTION: liquidation {} {} exceeds i64 range",
674 field, value
675 )
676 })
677}
678
679fn partial_liquidation_state(
680 status: &hypercall_types::liquidation_state::AccountLiquidationStatus,
681) -> Option<hypercall_types::WsPartialLiquidationState> {
682 let hypercall_types::liquidation_state::LiquidationState::PreLiquidation(metadata) =
683 &status.state
684 else {
685 return None;
686 };
687
688 Some(hypercall_types::WsPartialLiquidationState {
689 entered_at: checked_liquidation_timestamp("entered_at", metadata.entered_at),
690 target_equity: metadata.target_equity,
691 mm_shortfall: metadata.mm_shortfall,
692 escalation_deadline: checked_liquidation_timestamp(
693 "escalation_deadline",
694 metadata.escalation_deadline,
695 ),
696 last_reprice_at: metadata
697 .last_reprice_at
698 .map(|value| checked_liquidation_timestamp("last_reprice_at", value)),
699 active_order_request_ids: metadata.active_order_request_ids.clone(),
700 active_order_client_ids: metadata.active_order_client_ids.clone(),
701 bonus_bps: metadata.bonus_bps as i32,
702 pending_full_auction_id: metadata.pending_full_auction_id.clone(),
703 pending_full_request_id: metadata.pending_full_request_id.clone(),
704 pending_full_tx_hash: metadata.pending_full_tx_hash.clone(),
705 pending_full_margin_needed: metadata.pending_full_margin_needed,
706 })
707}
708
709fn full_liquidation_state(
710 status: &hypercall_types::liquidation_state::AccountLiquidationStatus,
711) -> Option<hypercall_types::WsFullLiquidationState> {
712 match &status.state {
713 hypercall_types::liquidation_state::LiquidationState::InLiquidation(metadata) => {
714 Some(hypercall_types::WsFullLiquidationState {
715 auction_id: Some(metadata.auction_id.clone()),
716 request_id: metadata.request_id.clone(),
717 tx_hash: metadata.tx_hash.clone(),
718 started_at: Some(checked_liquidation_timestamp(
719 "started_at",
720 metadata.started_at,
721 )),
722 chain_start_time: metadata
723 .chain_start_time
724 .map(|value| checked_liquidation_timestamp("chain_start_time", value)),
725 margin_needed: Some(metadata.margin_needed),
726 stop_request_id: metadata.stop_request_id.clone(),
727 stop_tx_hash: metadata.stop_tx_hash.clone(),
728 liquidated_at: None,
729 winner: None,
730 bonus: None,
731 resolution_tx_hash: None,
732 })
733 }
734 hypercall_types::liquidation_state::LiquidationState::Liquidated(metadata) => {
735 Some(hypercall_types::WsFullLiquidationState {
736 auction_id: Some(metadata.auction_id.clone()),
737 request_id: None,
738 tx_hash: None,
739 started_at: None,
740 chain_start_time: None,
741 margin_needed: None,
742 stop_request_id: None,
743 stop_tx_hash: None,
744 liquidated_at: Some(checked_liquidation_timestamp(
745 "completed_at",
746 metadata.completed_at,
747 )),
748 winner: metadata.winner.map(|winner| winner.to_string()),
749 bonus: Some(metadata.bonus),
750 resolution_tx_hash: metadata.tx_hash.clone(),
751 })
752 }
753 hypercall_types::liquidation_state::LiquidationState::Healthy
754 | hypercall_types::liquidation_state::LiquidationState::PreLiquidation(..) => None,
755 }
756}
757
758fn ws_liquidation_state_change_from_engine(
759 message: &hypercall_types::LiquidationStateMessage,
760) -> WsLiquidationStateChange {
761 WsLiquidationStateChange {
762 wallet_address: message.wallet,
763 previous_state: message.previous_state.as_str().to_string(),
764 new_state: message.new_state.as_str().to_string(),
765 liquidation_mode: message.liquidation_mode.clone(),
766 margin_mode: message.margin_mode.clone(),
767 equity: message.equity,
768 mm_required: message.mm_required,
769 maintenance_margin: message.maintenance_margin,
770 shortfall: message.shortfall,
771 partial_liquidation: partial_liquidation_state(&message.status),
772 full_liquidation: full_liquidation_state(&message.status),
773 timestamp: checked_liquidation_timestamp("event timestamp", message.timestamp),
774 }
775}
776
777pub fn ws_orderbook_update_from_engine_update(
778 orderbook_update_contract_units_raw: &hypercall_types::OrderbookUpdate,
779 option_token_address: Option<WalletAddress>,
780) -> super::WsOrderbookUpdate {
781 let bids_human_contracts = orderbook_update_contract_units_raw
782 .bids
783 .iter()
784 .map(|(price, size_contract_units_raw)| {
785 (
786 *price,
787 to_human_readable_decimal(
788 &orderbook_update_contract_units_raw.symbol,
789 *size_contract_units_raw,
790 ),
791 )
792 })
793 .collect();
794
795 let asks_human_contracts = orderbook_update_contract_units_raw
796 .asks
797 .iter()
798 .map(|(price, size_contract_units_raw)| {
799 (
800 *price,
801 to_human_readable_decimal(
802 &orderbook_update_contract_units_raw.symbol,
803 *size_contract_units_raw,
804 ),
805 )
806 })
807 .collect();
808
809 super::WsOrderbookUpdate {
810 symbol: orderbook_update_contract_units_raw.symbol.clone(),
811 option_token_address,
812 bids: bids_human_contracts,
813 asks: asks_human_contracts,
814 timestamp: orderbook_update_contract_units_raw.timestamp as i64,
815 }
816}