1use crate::fill_accounting::FillAccounting;
6use crate::hypercore_position::HypercorePositionUpdate;
7use crate::liquidation_state::AccountLiquidationStatus;
8use crate::settlement_events::PositionExpiredMessage;
9use crate::topics;
10use crate::{
11 Fill, L2Message, Market, MarketUpdateMessage, OrderAction, OrderInfo, OrderUpdateMessage,
12 OrderbookUpdate, Side, TradeMessage, WalletAddress,
13};
14use rust_decimal::Decimal;
15use serde::{Deserialize, Serialize};
16
17pub use crate::enums::MarketAction;
20pub use crate::enums::TransactionStatus;
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct OrderActionMessage {
25 pub timestamp: u64,
26 pub info: OrderInfo,
27 pub action: OrderAction,
28 pub wallet: WalletAddress,
29 pub api_wallet_address: Option<WalletAddress>, #[serde(default)]
31 pub mmp_triggered: bool, #[serde(default, skip_serializing_if = "Option::is_none")]
36 pub request_id: Option<String>,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct OrderInfoMessage {
41 pub timestamp: u64,
42 pub order_id: u64,
43 pub wallet: WalletAddress,
44 pub info: OrderInfo,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct HyperLiquidOrder {
50 pub a: u32, pub b: bool, pub p: String, pub s: String, pub r: bool, pub t: HyperLiquidOrderType, #[serde(skip_serializing_if = "Option::is_none")]
57 pub c: Option<String>, }
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
61#[serde(untagged)]
62pub enum HyperLiquidOrderType {
63 Limit { limit: HyperLiquidTif },
64 Trigger { trigger: HyperLiquidTrigger },
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct HyperLiquidTif {
69 pub tif: String, }
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct HyperLiquidTrigger {
74 #[serde(rename = "isMarket")]
75 pub is_market: bool,
76 #[serde(rename = "triggerPx")]
77 pub trigger_px: String,
78 pub tpsl: String, }
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct HyperLiquidBuilder {
83 pub b: WalletAddress, pub f: u32, }
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct HyperLiquidOrderAction {
89 #[serde(rename = "type")]
90 pub action_type: String, pub orders: Vec<HyperLiquidOrder>,
92 pub grouping: String, #[serde(skip_serializing_if = "Option::is_none")]
94 pub builder: Option<HyperLiquidBuilder>,
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct MarketActionMessage {
99 pub market: Market,
100 pub action: MarketAction,
101 pub timestamp: u64,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct TransactionRequest {
106 pub request_id: String, pub wallet_address: WalletAddress, pub account_contract: WalletAddress, pub transaction_type: TransactionType,
110 pub timestamp: u64,
111 pub expires_at: u64, }
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct SignedDirectiveTx {
116 pub directive: Vec<u8>,
118 pub signature: String,
120}
121
122#[derive(Debug, Clone, Serialize, Deserialize)]
123pub enum TransactionType {
124 UserDirective(SignedDirectiveTx),
125 RsmDirective(SignedDirectiveTx),
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize)]
129pub struct TransactionUpdate {
130 pub request_id: String,
131 pub status: TransactionStatus,
132 pub tx_hash: Option<String>,
133 pub error: Option<String>,
134 pub timestamp: u64,
135 pub gas_used: Option<u64>,
136 pub gas_price: Option<String>,
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize)]
140pub struct MmpTriggeredMessage {
141 pub wallet: WalletAddress,
142 pub currency: String,
143 pub reason: String, pub timestamp: u64,
145}
146
147#[derive(Debug, Clone, Serialize, Deserialize)]
149pub struct TierUpdateMessage {
150 pub wallet: WalletAddress,
151 pub margin_mode: String, pub version: i64,
154 pub timestamp: u64,
155}
156
157#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
162#[serde(rename_all = "snake_case")]
163pub enum LiquidationStateType {
164 Healthy,
166 PreLiquidation,
168 InLiquidation,
170 Liquidated,
172}
173
174impl std::fmt::Display for LiquidationStateType {
175 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176 match self {
177 LiquidationStateType::Healthy => write!(f, "Healthy"),
178 LiquidationStateType::PreLiquidation => write!(f, "PreLiquidation"),
179 LiquidationStateType::InLiquidation => write!(f, "InLiquidation"),
180 LiquidationStateType::Liquidated => write!(f, "Liquidated"),
181 }
182 }
183}
184
185impl LiquidationStateType {
186 pub fn as_str(&self) -> &'static str {
187 match self {
188 Self::Healthy => "healthy",
189 Self::PreLiquidation => "pre_liquidation",
190 Self::InLiquidation => "in_liquidation",
191 Self::Liquidated => "liquidated",
192 }
193 }
194
195 pub fn from_str(s: &str) -> Option<Self> {
197 match s {
198 "Healthy" | "healthy" => Some(Self::Healthy),
199 "PreLiquidation" | "pre_liquidation" => Some(Self::PreLiquidation),
200 "InLiquidation" | "in_liquidation" => Some(Self::InLiquidation),
201 "Liquidated" | "liquidated" => Some(Self::Liquidated),
202 _ => None,
203 }
204 }
205}
206
207#[derive(Debug, Clone, Serialize, Deserialize)]
211pub struct LiquidationStateMessage {
212 pub wallet: WalletAddress,
214 pub previous_state: LiquidationStateType,
216 pub new_state: LiquidationStateType,
218 #[serde(default, skip_serializing_if = "Option::is_none")]
220 pub previous_liquidation_mode: Option<String>,
221 pub liquidation_mode: Option<String>,
223 pub margin_mode: String,
225 pub equity: Decimal,
227 pub mm_required: Decimal,
229 pub maintenance_margin: Decimal,
231 pub shortfall: Decimal,
233 pub auction_id: Option<String>,
235 #[serde(default, skip_serializing_if = "Option::is_none")]
237 pub previous_auction_id: Option<String>,
238 #[serde(default)]
240 pub projection_changed: bool,
241 pub status: AccountLiquidationStatus,
243 pub timestamp: u64,
245}
246
247#[derive(Debug, Clone, Serialize, Deserialize)]
249pub struct RfqFillLeg {
250 pub instrument: String,
251 pub taker_side: Side,
252 pub price: Decimal,
253 pub size: Decimal,
254}
255
256#[derive(Debug, Clone, Serialize, Deserialize)]
258pub struct RfqFillMessage {
259 pub fill_id: String,
260 pub rfq_id: String,
261 pub quote_id: String,
262 pub taker_wallet: WalletAddress,
263 pub qp_wallet: WalletAddress,
264 pub legs: Vec<RfqFillLeg>,
265 pub net_premium: Decimal,
266 #[serde(default)]
270 pub taker_accept_signature: String,
271 pub timestamp: u64,
272}
273
274#[derive(Debug, Clone, Serialize, Deserialize)]
275pub enum EngineMessage {
276 OrderAction(OrderActionMessage),
277 OrderUpdate(OrderUpdateMessage),
278 OrderInfo(OrderInfoMessage),
279 MarketAction(MarketActionMessage),
280 MarketUpdate(MarketUpdateMessage),
281 OrderFilled {
282 fill: Fill,
283 accounting: FillAccounting,
284 },
285 OrderbookUpdated(OrderbookUpdate),
286 L2Update(L2Message),
287 Trade(TradeMessage),
288 TransactionRequest(TransactionRequest),
289 TransactionUpdate(TransactionUpdate),
290 MmpTriggered(MmpTriggeredMessage),
291 PositionExpired(PositionExpiredMessage),
292 TierUpdate(TierUpdateMessage),
294 HypercorePositionUpdate(HypercorePositionUpdate),
295 LiquidationStateChange(LiquidationStateMessage),
297 RfqFilled(RfqFillMessage),
299}
300
301pub const WIRE_FORMAT_VERSION: u8 = 1;
304
305pub fn serialize_to_wire_bytes<T: serde::Serialize>(value: &T) -> Vec<u8> {
308 let payload = rmp_serde::to_vec_named(value).expect("msgpack serialize");
309 let mut buf = Vec::with_capacity(1 + payload.len());
310 buf.push(WIRE_FORMAT_VERSION);
311 buf.extend_from_slice(&payload);
312 buf
313}
314
315#[derive(Debug, Clone, Serialize, Deserialize)]
316struct StoredFillEvent {
317 fill: Fill,
318 accounting: FillAccounting,
319}
320
321impl EngineMessage {
322 pub fn topic(&self) -> &'static str {
324 match self {
325 EngineMessage::OrderAction(_) => topics::TOPIC_ORDER_ACTIONS,
326 EngineMessage::OrderUpdate(_) => topics::TOPIC_ORDER_UPDATES,
327 EngineMessage::OrderInfo(_) => topics::TOPIC_ORDER_INFOS,
328 EngineMessage::MarketAction(_) => topics::TOPIC_MARKET_ACTIONS,
329 EngineMessage::MarketUpdate(_) => topics::TOPIC_MARKET_UPDATES,
330 EngineMessage::OrderFilled { .. } => topics::TOPIC_FILLS,
331 EngineMessage::OrderbookUpdated(_) => topics::TOPIC_ORDERBOOK_UPDATES,
332 EngineMessage::L2Update(_) => topics::TOPIC_L2_UPDATES,
333 EngineMessage::Trade(_) => topics::TOPIC_TRADES,
334 EngineMessage::TransactionRequest(_) => topics::TOPIC_TRANSACTION_REQUESTS,
335 EngineMessage::TransactionUpdate(_) => topics::TOPIC_TRANSACTION_UPDATES,
336 EngineMessage::MmpTriggered(_) => topics::TOPIC_MMP_TRIGGERS,
337 EngineMessage::PositionExpired(_) => topics::TOPIC_POSITION_EXPIRED,
338 EngineMessage::TierUpdate(_) => topics::TOPIC_TIER_UPDATES,
339 EngineMessage::HypercorePositionUpdate(_) => topics::TOPIC_HYPERCORE_POSITION_UPDATES,
340 EngineMessage::LiquidationStateChange(_) => topics::TOPIC_LIQUIDATION_STATE,
341 EngineMessage::RfqFilled(_) => topics::TOPIC_RFQ_FILLS,
342 }
343 }
344
345 pub fn partition_key(&self) -> Option<String> {
347 match self {
348 EngineMessage::OrderAction(action) => action
349 .info
350 .client_id
351 .clone()
352 .or_else(|| Some(format!("order_{}", action.wallet))),
353 EngineMessage::OrderUpdate(update) => update
354 .order_id
355 .map(|id| id.to_string())
356 .or_else(|| Some(format!("order_{}", update.info.symbol))),
357 EngineMessage::OrderInfo(info) => Some(info.order_id.to_string()),
358 EngineMessage::MarketAction(action) => Some(action.market.symbol.clone()),
359 EngineMessage::MarketUpdate(update) => Some(update.market.symbol.clone()),
360 EngineMessage::OrderFilled { fill, .. } => Some(fill.taker_order_id.to_string()),
361 EngineMessage::OrderbookUpdated(update) => Some(update.symbol.clone()),
362 EngineMessage::L2Update(update) => Some(update.symbol.clone()),
363 EngineMessage::Trade(trade) => Some(trade.symbol.clone()),
364 EngineMessage::TransactionRequest(req) => Some(req.account_contract.to_string()),
365 EngineMessage::TransactionUpdate(update) => Some(update.request_id.clone()),
366 EngineMessage::MmpTriggered(msg) => Some(format!("{}_{}", msg.wallet, msg.currency)),
367 EngineMessage::PositionExpired(msg) => {
368 Some(format!("{}_{}", msg.wallet_address, msg.symbol))
369 }
370 EngineMessage::TierUpdate(msg) => Some(format!("{}", msg.wallet)),
371 EngineMessage::HypercorePositionUpdate(update) => Some(update.account.clone()),
372 EngineMessage::LiquidationStateChange(msg) => Some(format!("{}", msg.wallet)),
373 EngineMessage::RfqFilled(msg) => Some(msg.rfq_id.clone()),
374 }
375 }
376
377 pub fn type_name(&self) -> &'static str {
379 match self {
380 EngineMessage::OrderAction(_) => "OrderAction",
381 EngineMessage::OrderUpdate(_) => "OrderUpdate",
382 EngineMessage::OrderInfo(_) => "OrderInfo",
383 EngineMessage::MarketAction(_) => "MarketAction",
384 EngineMessage::MarketUpdate(_) => "MarketUpdate",
385 EngineMessage::OrderFilled { .. } => "OrderFilled",
386 EngineMessage::OrderbookUpdated(_) => "OrderbookUpdated",
387 EngineMessage::L2Update(_) => "L2Update",
388 EngineMessage::Trade(_) => "Trade",
389 EngineMessage::TransactionRequest(_) => "TransactionRequest",
390 EngineMessage::TransactionUpdate(_) => "TransactionUpdate",
391 EngineMessage::MmpTriggered(_) => "MmpTriggered",
392 EngineMessage::PositionExpired(_) => "PositionExpired",
393 EngineMessage::TierUpdate(_) => "TierUpdate",
394 EngineMessage::HypercorePositionUpdate(_) => "HypercorePositionUpdate",
395 EngineMessage::LiquidationStateChange(_) => "LiquidationStateChange",
396 EngineMessage::RfqFilled(_) => "RfqFilled",
397 }
398 }
399
400 pub fn deserialize_from_wire(topic: &str, data: &[u8]) -> Result<Self, String> {
405 if data.is_empty() {
406 return Err("empty wire data".to_string());
407 }
408 let version = data[0];
409 if version != WIRE_FORMAT_VERSION {
410 return Err(format!("unsupported wire version: {}", version));
411 }
412 let payload = &data[1..];
413 match topic {
414 topics::TOPIC_ORDER_ACTIONS => rmp_serde::from_slice(payload)
415 .map(EngineMessage::OrderAction)
416 .map_err(|e| format!("deserialize OrderAction: {}", e)),
417 topics::TOPIC_ORDER_UPDATES => rmp_serde::from_slice(payload)
418 .map(EngineMessage::OrderUpdate)
419 .map_err(|e| format!("deserialize OrderUpdate: {}", e)),
420 topics::TOPIC_ORDER_INFOS => rmp_serde::from_slice(payload)
421 .map(EngineMessage::OrderInfo)
422 .map_err(|e| format!("deserialize OrderInfo: {}", e)),
423 topics::TOPIC_MARKET_ACTIONS => rmp_serde::from_slice(payload)
424 .map(EngineMessage::MarketAction)
425 .map_err(|e| format!("deserialize MarketAction: {}", e)),
426 topics::TOPIC_MARKET_UPDATES => rmp_serde::from_slice(payload)
427 .map(EngineMessage::MarketUpdate)
428 .map_err(|e| format!("deserialize MarketUpdate: {}", e)),
429 topics::TOPIC_FILLS => rmp_serde::from_slice::<StoredFillEvent>(payload)
430 .map(|stored| EngineMessage::OrderFilled {
431 fill: stored.fill,
432 accounting: stored.accounting,
433 })
434 .or_else(|stored_err| {
435 rmp_serde::from_slice::<Fill>(payload)
436 .map(|fill| EngineMessage::OrderFilled {
437 accounting: FillAccounting::from_fill(&fill),
438 fill,
439 })
440 .map_err(|fill_err| {
441 format!(
442 "deserialize Fill or StoredFillEvent: {}; {}",
443 stored_err, fill_err
444 )
445 })
446 }),
447 topics::TOPIC_ORDERBOOK_UPDATES => rmp_serde::from_slice(payload)
448 .map(EngineMessage::OrderbookUpdated)
449 .map_err(|e| format!("deserialize OrderbookUpdate: {}", e)),
450 topics::TOPIC_L2_UPDATES => rmp_serde::from_slice(payload)
451 .map(EngineMessage::L2Update)
452 .map_err(|e| format!("deserialize L2Message: {}", e)),
453 topics::TOPIC_TRADES => rmp_serde::from_slice(payload)
454 .map(EngineMessage::Trade)
455 .map_err(|e| format!("deserialize TradeMessage: {}", e)),
456 topics::TOPIC_TRANSACTION_REQUESTS => rmp_serde::from_slice(payload)
457 .map(EngineMessage::TransactionRequest)
458 .map_err(|e| format!("deserialize TransactionRequest: {}", e)),
459 topics::TOPIC_TRANSACTION_UPDATES => rmp_serde::from_slice(payload)
460 .map(EngineMessage::TransactionUpdate)
461 .map_err(|e| format!("deserialize TransactionUpdate: {}", e)),
462 topics::TOPIC_MMP_TRIGGERS => rmp_serde::from_slice(payload)
463 .map(EngineMessage::MmpTriggered)
464 .map_err(|e| format!("deserialize MmpTriggeredMessage: {}", e)),
465 topics::TOPIC_POSITION_EXPIRED => rmp_serde::from_slice(payload)
466 .map(EngineMessage::PositionExpired)
467 .map_err(|e| format!("deserialize PositionExpiredMessage: {}", e)),
468 topics::TOPIC_TIER_UPDATES => rmp_serde::from_slice(payload)
469 .map(EngineMessage::TierUpdate)
470 .map_err(|e| format!("deserialize TierUpdateMessage: {}", e)),
471 topics::TOPIC_HYPERCORE_POSITION_UPDATES => rmp_serde::from_slice(payload)
472 .map(EngineMessage::HypercorePositionUpdate)
473 .map_err(|e| format!("deserialize HypercorePositionUpdate: {}", e)),
474 topics::TOPIC_LIQUIDATION_STATE => rmp_serde::from_slice(payload)
475 .map(EngineMessage::LiquidationStateChange)
476 .map_err(|e| format!("deserialize LiquidationStateMessage: {}", e)),
477 topics::TOPIC_RFQ_FILLS => rmp_serde::from_slice(payload)
478 .map(EngineMessage::RfqFilled)
479 .map_err(|e| format!("deserialize RfqFillMessage: {}", e)),
480 _ => Err(format!("unknown topic: {}", topic)),
481 }
482 }
483
484 pub fn serialize_inner(&self) -> Result<Vec<u8>, rmp_serde::encode::Error> {
489 let payload = match self {
490 EngineMessage::OrderAction(inner) => rmp_serde::to_vec_named(inner)?,
491 EngineMessage::OrderUpdate(inner) => rmp_serde::to_vec_named(inner)?,
492 EngineMessage::OrderInfo(inner) => rmp_serde::to_vec_named(inner)?,
493 EngineMessage::MarketAction(inner) => rmp_serde::to_vec_named(inner)?,
494 EngineMessage::MarketUpdate(inner) => rmp_serde::to_vec_named(inner)?,
495 EngineMessage::OrderFilled { fill, accounting } => {
496 rmp_serde::to_vec_named(&StoredFillEvent {
497 fill: fill.clone(),
498 accounting: *accounting,
499 })?
500 }
501 EngineMessage::OrderbookUpdated(inner) => rmp_serde::to_vec_named(inner)?,
502 EngineMessage::L2Update(inner) => rmp_serde::to_vec_named(inner)?,
503 EngineMessage::Trade(inner) => rmp_serde::to_vec_named(inner)?,
504 EngineMessage::TransactionRequest(inner) => rmp_serde::to_vec_named(inner)?,
505 EngineMessage::TransactionUpdate(inner) => rmp_serde::to_vec_named(inner)?,
506 EngineMessage::MmpTriggered(inner) => rmp_serde::to_vec_named(inner)?,
507 EngineMessage::PositionExpired(inner) => rmp_serde::to_vec_named(inner)?,
508 EngineMessage::TierUpdate(inner) => rmp_serde::to_vec_named(inner)?,
509 EngineMessage::HypercorePositionUpdate(inner) => rmp_serde::to_vec_named(inner)?,
510 EngineMessage::LiquidationStateChange(inner) => rmp_serde::to_vec_named(inner)?,
511 EngineMessage::RfqFilled(inner) => rmp_serde::to_vec_named(inner)?,
512 };
513 let mut buf = Vec::with_capacity(1 + payload.len());
514 buf.push(WIRE_FORMAT_VERSION);
515 buf.extend_from_slice(&payload);
516 Ok(buf)
517 }
518}
519
520#[cfg(test)]
521mod tests {
522 use super::*;
523 use crate::wallet_address::test_wallet;
524 use rust_decimal_macros::dec;
525
526 #[derive(Debug, Serialize, Deserialize)]
527 struct OldPositionExpiredMessage {
528 wallet_address: WalletAddress,
529 symbol: String,
530 position_size: Decimal,
531 settlement_price: Decimal,
532 settlement_value: Decimal,
533 timestamp: u64,
534 }
535
536 #[test]
537 fn position_expired_rejects_payload_without_margin_mode() {
538 let old = OldPositionExpiredMessage {
539 wallet_address: test_wallet(42),
540 symbol: "BTC-20250115-100000-C".to_string(),
541 position_size: dec!(1.25),
542 settlement_price: dec!(5000),
543 settlement_value: dec!(6250),
544 timestamp: 1736899200000,
545 };
546
547 let bytes = rmp_serde::to_vec_named(&old).expect("serialize old payload");
548 let err = rmp_serde::from_slice::<PositionExpiredMessage>(&bytes)
549 .expect_err("missing margin_mode must fail deserialization");
550
551 assert!(
552 err.to_string().contains("missing field `margin_mode`"),
553 "unexpected error: {err}"
554 );
555 }
556
557 #[derive(Debug, Serialize, Deserialize)]
558 struct PositionExpiredMessageWithoutEconomics {
559 wallet_address: WalletAddress,
560 margin_mode: crate::MarginMode,
561 symbol: String,
562 position_size: Decimal,
563 settlement_price: Decimal,
564 settlement_value: Decimal,
565 timestamp: u64,
566 }
567
568 #[test]
569 fn position_expired_deserializes_missing_economics_as_none() {
570 let old = PositionExpiredMessageWithoutEconomics {
571 wallet_address: test_wallet(42),
572 margin_mode: crate::MarginMode::Standard,
573 symbol: "BTC-20250115-100000-C".to_string(),
574 position_size: dec!(1.25),
575 settlement_price: dec!(5000),
576 settlement_value: dec!(6250),
577 timestamp: 1736899200000,
578 };
579
580 let bytes = rmp_serde::to_vec_named(&old).expect("serialize payload");
581 let parsed: PositionExpiredMessage =
582 rmp_serde::from_slice(&bytes).expect("deserialize into current payload");
583
584 assert_eq!(parsed.wallet_address, old.wallet_address);
585 assert_eq!(parsed.margin_mode, old.margin_mode);
586 assert_eq!(parsed.symbol, old.symbol);
587 assert_eq!(parsed.position_size, old.position_size);
588 assert_eq!(parsed.settlement_price, old.settlement_price);
589 assert_eq!(parsed.settlement_value, old.settlement_value);
590 assert_eq!(parsed.timestamp, old.timestamp);
591 assert_eq!(parsed.settlement_entry_price, None);
592 assert_eq!(parsed.cost_basis, None);
593 assert_eq!(parsed.net_pnl, None);
594 }
595
596 #[test]
597 fn order_filled_wire_round_trip_preserves_accounting() {
598 let fill = Fill {
599 trade_id: 7,
600 taker_order_id: 11,
601 maker_order_id: 12,
602 symbol: "BTC-20261231-100000-C".to_string(),
603 price: dec!(250),
604 size: dec!(100000000),
605 taker_side: Side::Buy,
606 taker_wallet_address: test_wallet(1),
607 maker_wallet_address: test_wallet(2),
608 fee: Decimal::ZERO,
609 is_taker: true,
610 timestamp: 1_700_000_000_000,
611 builder_code_address: None,
612 builder_code_fee: None,
613 source: crate::FillSource::Orderbook,
614 taker_realized_pnl: Some(dec!(10)),
615 maker_realized_pnl: Some(dec!(-10)),
616 underlying_notional: Some(dec!(10000000)),
617 };
618 let accounting = FillAccounting {
619 trade_id: fill.trade_id,
620 taker_realized_pnl: dec!(10),
621 maker_realized_pnl: dec!(-10),
622 taker_premium_delta: dec!(-250),
623 maker_premium_delta: dec!(250),
624 taker_net_cash_delta: dec!(-240),
625 maker_net_cash_delta: dec!(240),
626 };
627 let message = EngineMessage::OrderFilled {
628 fill: fill.clone(),
629 accounting,
630 };
631
632 let wire = message.serialize_inner().expect("serialize order filled");
633 let parsed = EngineMessage::deserialize_from_wire(topics::TOPIC_FILLS, &wire)
634 .expect("deserialize order filled");
635
636 let EngineMessage::OrderFilled {
637 fill: parsed_fill,
638 accounting: parsed_accounting,
639 } = parsed
640 else {
641 panic!("expected OrderFilled");
642 };
643 assert_eq!(parsed_fill.trade_id, fill.trade_id);
644 assert_eq!(parsed_fill.taker_order_id, fill.taker_order_id);
645 assert_eq!(parsed_fill.maker_order_id, fill.maker_order_id);
646 assert_eq!(parsed_fill.symbol, fill.symbol);
647 assert_eq!(parsed_fill.price, fill.price);
648 assert_eq!(parsed_fill.size, fill.size);
649 assert_eq!(parsed_fill.taker_side, fill.taker_side);
650 assert_eq!(parsed_fill.taker_wallet_address, fill.taker_wallet_address);
651 assert_eq!(parsed_fill.maker_wallet_address, fill.maker_wallet_address);
652 assert_eq!(parsed_fill.taker_realized_pnl, fill.taker_realized_pnl);
653 assert_eq!(parsed_fill.maker_realized_pnl, fill.maker_realized_pnl);
654 assert_eq!(parsed_fill.underlying_notional, fill.underlying_notional);
655 assert_eq!(parsed_accounting, accounting);
656 }
657
658 #[test]
659 fn order_filled_deserializes_legacy_fill_payload() {
660 let fill = Fill {
661 trade_id: 8,
662 taker_order_id: 21,
663 maker_order_id: 22,
664 symbol: "ETH-PERP".to_string(),
665 price: dec!(3000),
666 size: dec!(1000000),
667 taker_side: Side::Sell,
668 taker_wallet_address: test_wallet(3),
669 maker_wallet_address: test_wallet(4),
670 fee: Decimal::ZERO,
671 is_taker: true,
672 timestamp: 1_700_000_000_001,
673 builder_code_address: None,
674 builder_code_fee: None,
675 source: crate::FillSource::Orderbook,
676 taker_realized_pnl: Some(dec!(25)),
677 maker_realized_pnl: Some(dec!(-25)),
678 underlying_notional: None,
679 };
680
681 let mut wire = Vec::new();
682 wire.push(WIRE_FORMAT_VERSION);
683 wire.extend(rmp_serde::to_vec_named(&fill).expect("serialize legacy fill"));
684
685 let parsed = EngineMessage::deserialize_from_wire(topics::TOPIC_FILLS, &wire)
686 .expect("deserialize legacy fill");
687 let EngineMessage::OrderFilled {
688 fill: parsed_fill,
689 accounting,
690 } = parsed
691 else {
692 panic!("expected OrderFilled");
693 };
694 assert_eq!(parsed_fill.trade_id, fill.trade_id);
695 assert_eq!(parsed_fill.taker_order_id, fill.taker_order_id);
696 assert_eq!(parsed_fill.maker_order_id, fill.maker_order_id);
697 assert_eq!(parsed_fill.symbol, fill.symbol);
698 assert_eq!(parsed_fill.price, fill.price);
699 assert_eq!(parsed_fill.size, fill.size);
700 assert_eq!(parsed_fill.taker_side, fill.taker_side);
701 assert_eq!(parsed_fill.taker_wallet_address, fill.taker_wallet_address);
702 assert_eq!(parsed_fill.maker_wallet_address, fill.maker_wallet_address);
703 assert_eq!(parsed_fill.taker_realized_pnl, fill.taker_realized_pnl);
704 assert_eq!(parsed_fill.maker_realized_pnl, fill.maker_realized_pnl);
705 assert_eq!(parsed_fill.underlying_notional, None);
706 assert_eq!(accounting, FillAccounting::from_fill(&fill));
707 }
708}