Skip to main content

hypercall_types/
engine_messages.rs

1//! Engine message types for the event bus.
2//!
3//! All types that flow through the EngineMessage enum, plus serialization helpers.
4
5use crate::fill_accounting::FillAccounting;
6use crate::hypercore_position::HypercorePositionUpdate;
7use crate::liquidation_state::AccountLiquidationStatus;
8use crate::settlement_events::PositionExpiredMessage;
9use crate::topics;
10use crate::{
11    Fill, L2Message, Market, MarketUpdateMessage, OrderAction, OrderInfo, OrderUpdateMessage,
12    OrderbookUpdate, Side, TradeMessage, WalletAddress,
13};
14use rust_decimal::Decimal;
15use serde::{Deserialize, Serialize};
16
17// Re-export MarketAction from enums for backward compatibility
18// (previously defined locally in messages.rs, now canonical in enums.rs).
19pub use crate::enums::MarketAction;
20// Re-export TransactionStatus from enums for backward compatibility.
21pub use crate::enums::TransactionStatus;
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct OrderActionMessage {
25    pub timestamp: u64,
26    pub info: OrderInfo,
27    pub action: OrderAction,
28    pub wallet: WalletAddress,
29    pub api_wallet_address: Option<WalletAddress>, // Recovered from signature for perp orders
30    #[serde(default)]
31    pub mmp_triggered: bool,  // True when this cancel action was auto-generated by MMP system
32    /// Unique request ID for command correlation and tracing.
33    /// Generated at API boundary for external requests, or internally for engine-triggered actions.
34    /// Uses serde(default) for backward compatibility with historical messages.
35    #[serde(default, skip_serializing_if = "Option::is_none")]
36    pub request_id: Option<String>,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct OrderInfoMessage {
41    pub timestamp: u64,
42    pub order_id: u64,
43    pub wallet: WalletAddress,
44    pub info: OrderInfo,
45}
46
47// HyperLiquid order format - matches their API exactly from https://hyperliquid.gitbook.io/hyperliquid-docs/for-developers/api/exchange-endpoint#place-an-order
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct HyperLiquidOrder {
50    pub a: u32,                  // asset
51    pub b: bool,                 // isBuy
52    pub p: String,               // price
53    pub s: String,               // size
54    pub r: bool,                 // reduceOnly
55    pub t: HyperLiquidOrderType, // type
56    #[serde(skip_serializing_if = "Option::is_none")]
57    pub c: Option<String>, // cloid (client order id)
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
61#[serde(untagged)]
62pub enum HyperLiquidOrderType {
63    Limit { limit: HyperLiquidTif },
64    Trigger { trigger: HyperLiquidTrigger },
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct HyperLiquidTif {
69    pub tif: String, // "Alo" | "Ioc" | "Gtc"
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct HyperLiquidTrigger {
74    #[serde(rename = "isMarket")]
75    pub is_market: bool,
76    #[serde(rename = "triggerPx")]
77    pub trigger_px: String,
78    pub tpsl: String, // "tp" | "sl"
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct HyperLiquidBuilder {
83    pub b: WalletAddress, // builder address
84    pub f: u32,           // fee in tenths of basis point
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct HyperLiquidOrderAction {
89    #[serde(rename = "type")]
90    pub action_type: String, // "order"
91    pub orders: Vec<HyperLiquidOrder>,
92    pub grouping: String, // "na" | "normalTpsl" | "positionTpsl"
93    #[serde(skip_serializing_if = "Option::is_none")]
94    pub builder: Option<HyperLiquidBuilder>,
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct MarketActionMessage {
99    pub market: Market,
100    pub action: MarketAction,
101    pub timestamp: u64,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct TransactionRequest {
106    pub request_id: String,              // Unique ID for tracking
107    pub wallet_address: WalletAddress,   // User's wallet
108    pub account_contract: WalletAddress, // User's account contract address
109    pub transaction_type: TransactionType,
110    pub timestamp: u64,
111    pub expires_at: u64, // When this request expires
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct SignedDirectiveTx {
116    /// ABI-encoded directive payload bytes.
117    pub directive: Vec<u8>,
118    /// Directive signature (hex string, 0x-prefixed).
119    pub signature: String,
120}
121
122#[derive(Debug, Clone, Serialize, Deserialize)]
123pub enum TransactionType {
124    UserDirective(SignedDirectiveTx),
125    RsmDirective(SignedDirectiveTx),
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize)]
129pub struct TransactionUpdate {
130    pub request_id: String,
131    pub status: TransactionStatus,
132    pub tx_hash: Option<String>,
133    pub error: Option<String>,
134    pub timestamp: u64,
135    pub gas_used: Option<u64>,
136    pub gas_price: Option<String>,
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize)]
140pub struct MmpTriggeredMessage {
141    pub wallet: WalletAddress,
142    pub currency: String,
143    pub reason: String, // "qty_limit" | "delta_limit" | "vega_limit"
144    pub timestamp: u64,
145}
146
147/// Margin mode update message for cross-process cache synchronization.
148#[derive(Debug, Clone, Serialize, Deserialize)]
149pub struct TierUpdateMessage {
150    pub wallet: WalletAddress,
151    pub margin_mode: String, // "standard" or "portfolio"
152    /// Version for ordering updates (higher = newer)
153    pub version: i64,
154    pub timestamp: u64,
155}
156
157/// Liquidation state type for messages.
158///
159/// A simple enum representing the liquidation state without associated data.
160/// Used in `LiquidationStateMessage` for event bus messages.
161#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
162#[serde(rename_all = "snake_case")]
163pub enum LiquidationStateType {
164    /// Account is healthy (above maintenance margin).
165    Healthy,
166    /// Account is below maintenance margin; risk-increasing orders blocked.
167    PreLiquidation,
168    /// Liquidation auction is in progress.
169    InLiquidation,
170    /// Liquidation completed.
171    Liquidated,
172}
173
174impl std::fmt::Display for LiquidationStateType {
175    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176        match self {
177            LiquidationStateType::Healthy => write!(f, "Healthy"),
178            LiquidationStateType::PreLiquidation => write!(f, "PreLiquidation"),
179            LiquidationStateType::InLiquidation => write!(f, "InLiquidation"),
180            LiquidationStateType::Liquidated => write!(f, "Liquidated"),
181        }
182    }
183}
184
185impl LiquidationStateType {
186    pub fn as_str(&self) -> &'static str {
187        match self {
188            Self::Healthy => "healthy",
189            Self::PreLiquidation => "pre_liquidation",
190            Self::InLiquidation => "in_liquidation",
191            Self::Liquidated => "liquidated",
192        }
193    }
194
195    /// Convert from string representation (for backwards compatibility).
196    pub fn from_str(s: &str) -> Option<Self> {
197        match s {
198            "Healthy" | "healthy" => Some(Self::Healthy),
199            "PreLiquidation" | "pre_liquidation" => Some(Self::PreLiquidation),
200            "InLiquidation" | "in_liquidation" => Some(Self::InLiquidation),
201            "Liquidated" | "liquidated" => Some(Self::Liquidated),
202            _ => None,
203        }
204    }
205}
206
207/// Liquidation state change message.
208///
209/// Published when an account's liquidation state changes (e.g., Healthy -> PreLiquidation).
210#[derive(Debug, Clone, Serialize, Deserialize)]
211pub struct LiquidationStateMessage {
212    /// Wallet address.
213    pub wallet: WalletAddress,
214    /// Previous state.
215    pub previous_state: LiquidationStateType,
216    /// New state.
217    pub new_state: LiquidationStateType,
218    /// Previous liquidation mode, when a liquidation flow was already active.
219    #[serde(default, skip_serializing_if = "Option::is_none")]
220    pub previous_liquidation_mode: Option<String>,
221    /// Current liquidation mode, when a liquidation flow is active.
222    pub liquidation_mode: Option<String>,
223    /// Margin mode (standard or portfolio).
224    pub margin_mode: String,
225    /// Current equity.
226    pub equity: Decimal,
227    /// Maintenance margin requirement.
228    pub mm_required: Decimal,
229    /// Equity minus maintenance margin requirement.
230    pub maintenance_margin: Decimal,
231    /// Shortfall amount (positive if underwater).
232    pub shortfall: Decimal,
233    /// Auction ID (if in liquidation).
234    pub auction_id: Option<String>,
235    /// Previous auction ID, if the prior state already referenced one.
236    #[serde(default, skip_serializing_if = "Option::is_none")]
237    pub previous_auction_id: Option<String>,
238    /// Whether the liquidation projection changed materially, even if the top-level state stayed the same.
239    #[serde(default)]
240    pub projection_changed: bool,
241    /// Full restart-safe liquidation status snapshot at the time of transition.
242    pub status: AccountLiquidationStatus,
243    /// Timestamp when this change occurred (millis).
244    pub timestamp: u64,
245}
246
247/// RFQ fill leg details.
248#[derive(Debug, Clone, Serialize, Deserialize)]
249pub struct RfqFillLeg {
250    pub instrument: String,
251    pub taker_side: Side,
252    pub price: Decimal,
253    pub size: Decimal,
254}
255
256/// RFQ fill event emitted after successful RFQ execution.
257#[derive(Debug, Clone, Serialize, Deserialize)]
258pub struct RfqFillMessage {
259    pub fill_id: String,
260    pub rfq_id: String,
261    pub quote_id: String,
262    pub taker_wallet: WalletAddress,
263    pub qp_wallet: WalletAddress,
264    pub legs: Vec<RfqFillLeg>,
265    pub net_premium: Decimal,
266    /// Taker's EIP-712 signature over `AcceptRFQQuote`. Persisted into
267    /// `rfq_fills.taker_accept_signature` so the durable audit trail ties
268    /// the executed fill back to the exact signed acceptance payload.
269    #[serde(default)]
270    pub taker_accept_signature: String,
271    pub timestamp: u64,
272}
273
274#[derive(Debug, Clone, Serialize, Deserialize)]
275pub enum EngineMessage {
276    OrderAction(OrderActionMessage),
277    OrderUpdate(OrderUpdateMessage),
278    OrderInfo(OrderInfoMessage),
279    MarketAction(MarketActionMessage),
280    MarketUpdate(MarketUpdateMessage),
281    OrderFilled {
282        fill: Fill,
283        accounting: FillAccounting,
284    },
285    OrderbookUpdated(OrderbookUpdate),
286    L2Update(L2Message),
287    Trade(TradeMessage),
288    TransactionRequest(TransactionRequest),
289    TransactionUpdate(TransactionUpdate),
290    MmpTriggered(MmpTriggeredMessage),
291    PositionExpired(PositionExpiredMessage),
292    /// Tier/margin mode update for cross-process cache sync.
293    TierUpdate(TierUpdateMessage),
294    HypercorePositionUpdate(HypercorePositionUpdate),
295    /// Liquidation state change notification.
296    LiquidationStateChange(LiquidationStateMessage),
297    /// RFQ fill event.
298    RfqFilled(RfqFillMessage),
299}
300
301/// Current engine event wire format version.
302/// Version 1 = MessagePack with named fields (rmp-serde to_vec_named).
303pub const WIRE_FORMAT_VERSION: u8 = 1;
304
305/// Serialize any value to wire-format bytes: [version byte][msgpack payload].
306/// Uses the same format as `EngineMessage::serialize_inner` and engine_events.
307pub fn serialize_to_wire_bytes<T: serde::Serialize>(value: &T) -> Vec<u8> {
308    let payload = rmp_serde::to_vec_named(value).expect("msgpack serialize");
309    let mut buf = Vec::with_capacity(1 + payload.len());
310    buf.push(WIRE_FORMAT_VERSION);
311    buf.extend_from_slice(&payload);
312    buf
313}
314
315#[derive(Debug, Clone, Serialize, Deserialize)]
316struct StoredFillEvent {
317    fill: Fill,
318    accounting: FillAccounting,
319}
320
321impl EngineMessage {
322    /// Get the event topic for this message type.
323    pub fn topic(&self) -> &'static str {
324        match self {
325            EngineMessage::OrderAction(_) => topics::TOPIC_ORDER_ACTIONS,
326            EngineMessage::OrderUpdate(_) => topics::TOPIC_ORDER_UPDATES,
327            EngineMessage::OrderInfo(_) => topics::TOPIC_ORDER_INFOS,
328            EngineMessage::MarketAction(_) => topics::TOPIC_MARKET_ACTIONS,
329            EngineMessage::MarketUpdate(_) => topics::TOPIC_MARKET_UPDATES,
330            EngineMessage::OrderFilled { .. } => topics::TOPIC_FILLS,
331            EngineMessage::OrderbookUpdated(_) => topics::TOPIC_ORDERBOOK_UPDATES,
332            EngineMessage::L2Update(_) => topics::TOPIC_L2_UPDATES,
333            EngineMessage::Trade(_) => topics::TOPIC_TRADES,
334            EngineMessage::TransactionRequest(_) => topics::TOPIC_TRANSACTION_REQUESTS,
335            EngineMessage::TransactionUpdate(_) => topics::TOPIC_TRANSACTION_UPDATES,
336            EngineMessage::MmpTriggered(_) => topics::TOPIC_MMP_TRIGGERS,
337            EngineMessage::PositionExpired(_) => topics::TOPIC_POSITION_EXPIRED,
338            EngineMessage::TierUpdate(_) => topics::TOPIC_TIER_UPDATES,
339            EngineMessage::HypercorePositionUpdate(_) => topics::TOPIC_HYPERCORE_POSITION_UPDATES,
340            EngineMessage::LiquidationStateChange(_) => topics::TOPIC_LIQUIDATION_STATE,
341            EngineMessage::RfqFilled(_) => topics::TOPIC_RFQ_FILLS,
342        }
343    }
344
345    /// Get the partition key (used for message ordering within a partition).
346    pub fn partition_key(&self) -> Option<String> {
347        match self {
348            EngineMessage::OrderAction(action) => action
349                .info
350                .client_id
351                .clone()
352                .or_else(|| Some(format!("order_{}", action.wallet))),
353            EngineMessage::OrderUpdate(update) => update
354                .order_id
355                .map(|id| id.to_string())
356                .or_else(|| Some(format!("order_{}", update.info.symbol))),
357            EngineMessage::OrderInfo(info) => Some(info.order_id.to_string()),
358            EngineMessage::MarketAction(action) => Some(action.market.symbol.clone()),
359            EngineMessage::MarketUpdate(update) => Some(update.market.symbol.clone()),
360            EngineMessage::OrderFilled { fill, .. } => Some(fill.taker_order_id.to_string()),
361            EngineMessage::OrderbookUpdated(update) => Some(update.symbol.clone()),
362            EngineMessage::L2Update(update) => Some(update.symbol.clone()),
363            EngineMessage::Trade(trade) => Some(trade.symbol.clone()),
364            EngineMessage::TransactionRequest(req) => Some(req.account_contract.to_string()),
365            EngineMessage::TransactionUpdate(update) => Some(update.request_id.clone()),
366            EngineMessage::MmpTriggered(msg) => Some(format!("{}_{}", msg.wallet, msg.currency)),
367            EngineMessage::PositionExpired(msg) => {
368                Some(format!("{}_{}", msg.wallet_address, msg.symbol))
369            }
370            EngineMessage::TierUpdate(msg) => Some(format!("{}", msg.wallet)),
371            EngineMessage::HypercorePositionUpdate(update) => Some(update.account.clone()),
372            EngineMessage::LiquidationStateChange(msg) => Some(format!("{}", msg.wallet)),
373            EngineMessage::RfqFilled(msg) => Some(msg.rfq_id.clone()),
374        }
375    }
376
377    /// Get the event type name (for logging/metrics).
378    pub fn type_name(&self) -> &'static str {
379        match self {
380            EngineMessage::OrderAction(_) => "OrderAction",
381            EngineMessage::OrderUpdate(_) => "OrderUpdate",
382            EngineMessage::OrderInfo(_) => "OrderInfo",
383            EngineMessage::MarketAction(_) => "MarketAction",
384            EngineMessage::MarketUpdate(_) => "MarketUpdate",
385            EngineMessage::OrderFilled { .. } => "OrderFilled",
386            EngineMessage::OrderbookUpdated(_) => "OrderbookUpdated",
387            EngineMessage::L2Update(_) => "L2Update",
388            EngineMessage::Trade(_) => "Trade",
389            EngineMessage::TransactionRequest(_) => "TransactionRequest",
390            EngineMessage::TransactionUpdate(_) => "TransactionUpdate",
391            EngineMessage::MmpTriggered(_) => "MmpTriggered",
392            EngineMessage::PositionExpired(_) => "PositionExpired",
393            EngineMessage::TierUpdate(_) => "TierUpdate",
394            EngineMessage::HypercorePositionUpdate(_) => "HypercorePositionUpdate",
395            EngineMessage::LiquidationStateChange(_) => "LiquidationStateChange",
396            EngineMessage::RfqFilled(_) => "RfqFilled",
397        }
398    }
399
400    /// Deserialize wire-format bytes (version byte + msgpack) back into an EngineMessage.
401    ///
402    /// The topic string determines which inner type to deserialize into (inverse of `serialize_inner`).
403    /// Wire format: [version: u8][msgpack payload], where version must be 1.
404    pub fn deserialize_from_wire(topic: &str, data: &[u8]) -> Result<Self, String> {
405        if data.is_empty() {
406            return Err("empty wire data".to_string());
407        }
408        let version = data[0];
409        if version != WIRE_FORMAT_VERSION {
410            return Err(format!("unsupported wire version: {}", version));
411        }
412        let payload = &data[1..];
413        match topic {
414            topics::TOPIC_ORDER_ACTIONS => rmp_serde::from_slice(payload)
415                .map(EngineMessage::OrderAction)
416                .map_err(|e| format!("deserialize OrderAction: {}", e)),
417            topics::TOPIC_ORDER_UPDATES => rmp_serde::from_slice(payload)
418                .map(EngineMessage::OrderUpdate)
419                .map_err(|e| format!("deserialize OrderUpdate: {}", e)),
420            topics::TOPIC_ORDER_INFOS => rmp_serde::from_slice(payload)
421                .map(EngineMessage::OrderInfo)
422                .map_err(|e| format!("deserialize OrderInfo: {}", e)),
423            topics::TOPIC_MARKET_ACTIONS => rmp_serde::from_slice(payload)
424                .map(EngineMessage::MarketAction)
425                .map_err(|e| format!("deserialize MarketAction: {}", e)),
426            topics::TOPIC_MARKET_UPDATES => rmp_serde::from_slice(payload)
427                .map(EngineMessage::MarketUpdate)
428                .map_err(|e| format!("deserialize MarketUpdate: {}", e)),
429            topics::TOPIC_FILLS => rmp_serde::from_slice::<StoredFillEvent>(payload)
430                .map(|stored| EngineMessage::OrderFilled {
431                    fill: stored.fill,
432                    accounting: stored.accounting,
433                })
434                .or_else(|stored_err| {
435                    rmp_serde::from_slice::<Fill>(payload)
436                        .map(|fill| EngineMessage::OrderFilled {
437                            accounting: FillAccounting::from_fill(&fill),
438                            fill,
439                        })
440                        .map_err(|fill_err| {
441                            format!(
442                                "deserialize Fill or StoredFillEvent: {}; {}",
443                                stored_err, fill_err
444                            )
445                        })
446                }),
447            topics::TOPIC_ORDERBOOK_UPDATES => rmp_serde::from_slice(payload)
448                .map(EngineMessage::OrderbookUpdated)
449                .map_err(|e| format!("deserialize OrderbookUpdate: {}", e)),
450            topics::TOPIC_L2_UPDATES => rmp_serde::from_slice(payload)
451                .map(EngineMessage::L2Update)
452                .map_err(|e| format!("deserialize L2Message: {}", e)),
453            topics::TOPIC_TRADES => rmp_serde::from_slice(payload)
454                .map(EngineMessage::Trade)
455                .map_err(|e| format!("deserialize TradeMessage: {}", e)),
456            topics::TOPIC_TRANSACTION_REQUESTS => rmp_serde::from_slice(payload)
457                .map(EngineMessage::TransactionRequest)
458                .map_err(|e| format!("deserialize TransactionRequest: {}", e)),
459            topics::TOPIC_TRANSACTION_UPDATES => rmp_serde::from_slice(payload)
460                .map(EngineMessage::TransactionUpdate)
461                .map_err(|e| format!("deserialize TransactionUpdate: {}", e)),
462            topics::TOPIC_MMP_TRIGGERS => rmp_serde::from_slice(payload)
463                .map(EngineMessage::MmpTriggered)
464                .map_err(|e| format!("deserialize MmpTriggeredMessage: {}", e)),
465            topics::TOPIC_POSITION_EXPIRED => rmp_serde::from_slice(payload)
466                .map(EngineMessage::PositionExpired)
467                .map_err(|e| format!("deserialize PositionExpiredMessage: {}", e)),
468            topics::TOPIC_TIER_UPDATES => rmp_serde::from_slice(payload)
469                .map(EngineMessage::TierUpdate)
470                .map_err(|e| format!("deserialize TierUpdateMessage: {}", e)),
471            topics::TOPIC_HYPERCORE_POSITION_UPDATES => rmp_serde::from_slice(payload)
472                .map(EngineMessage::HypercorePositionUpdate)
473                .map_err(|e| format!("deserialize HypercorePositionUpdate: {}", e)),
474            topics::TOPIC_LIQUIDATION_STATE => rmp_serde::from_slice(payload)
475                .map(EngineMessage::LiquidationStateChange)
476                .map_err(|e| format!("deserialize LiquidationStateMessage: {}", e)),
477            topics::TOPIC_RFQ_FILLS => rmp_serde::from_slice(payload)
478                .map(EngineMessage::RfqFilled)
479                .map_err(|e| format!("deserialize RfqFillMessage: {}", e)),
480            _ => Err(format!("unknown topic: {}", topic)),
481        }
482    }
483
484    /// Serialize just the inner type (not the enum wrapper) as MessagePack with version byte.
485    ///
486    /// Wire format: [version: u8][msgpack payload]
487    /// Version 1 = MessagePack with named fields (rmp-serde to_vec_named).
488    pub fn serialize_inner(&self) -> Result<Vec<u8>, rmp_serde::encode::Error> {
489        let payload = match self {
490            EngineMessage::OrderAction(inner) => rmp_serde::to_vec_named(inner)?,
491            EngineMessage::OrderUpdate(inner) => rmp_serde::to_vec_named(inner)?,
492            EngineMessage::OrderInfo(inner) => rmp_serde::to_vec_named(inner)?,
493            EngineMessage::MarketAction(inner) => rmp_serde::to_vec_named(inner)?,
494            EngineMessage::MarketUpdate(inner) => rmp_serde::to_vec_named(inner)?,
495            EngineMessage::OrderFilled { fill, accounting } => {
496                rmp_serde::to_vec_named(&StoredFillEvent {
497                    fill: fill.clone(),
498                    accounting: *accounting,
499                })?
500            }
501            EngineMessage::OrderbookUpdated(inner) => rmp_serde::to_vec_named(inner)?,
502            EngineMessage::L2Update(inner) => rmp_serde::to_vec_named(inner)?,
503            EngineMessage::Trade(inner) => rmp_serde::to_vec_named(inner)?,
504            EngineMessage::TransactionRequest(inner) => rmp_serde::to_vec_named(inner)?,
505            EngineMessage::TransactionUpdate(inner) => rmp_serde::to_vec_named(inner)?,
506            EngineMessage::MmpTriggered(inner) => rmp_serde::to_vec_named(inner)?,
507            EngineMessage::PositionExpired(inner) => rmp_serde::to_vec_named(inner)?,
508            EngineMessage::TierUpdate(inner) => rmp_serde::to_vec_named(inner)?,
509            EngineMessage::HypercorePositionUpdate(inner) => rmp_serde::to_vec_named(inner)?,
510            EngineMessage::LiquidationStateChange(inner) => rmp_serde::to_vec_named(inner)?,
511            EngineMessage::RfqFilled(inner) => rmp_serde::to_vec_named(inner)?,
512        };
513        let mut buf = Vec::with_capacity(1 + payload.len());
514        buf.push(WIRE_FORMAT_VERSION);
515        buf.extend_from_slice(&payload);
516        Ok(buf)
517    }
518}
519
520#[cfg(test)]
521mod tests {
522    use super::*;
523    use crate::wallet_address::test_wallet;
524    use rust_decimal_macros::dec;
525
526    #[derive(Debug, Serialize, Deserialize)]
527    struct OldPositionExpiredMessage {
528        wallet_address: WalletAddress,
529        symbol: String,
530        position_size: Decimal,
531        settlement_price: Decimal,
532        settlement_value: Decimal,
533        timestamp: u64,
534    }
535
536    #[test]
537    fn position_expired_rejects_payload_without_margin_mode() {
538        let old = OldPositionExpiredMessage {
539            wallet_address: test_wallet(42),
540            symbol: "BTC-20250115-100000-C".to_string(),
541            position_size: dec!(1.25),
542            settlement_price: dec!(5000),
543            settlement_value: dec!(6250),
544            timestamp: 1736899200000,
545        };
546
547        let bytes = rmp_serde::to_vec_named(&old).expect("serialize old payload");
548        let err = rmp_serde::from_slice::<PositionExpiredMessage>(&bytes)
549            .expect_err("missing margin_mode must fail deserialization");
550
551        assert!(
552            err.to_string().contains("missing field `margin_mode`"),
553            "unexpected error: {err}"
554        );
555    }
556
557    #[derive(Debug, Serialize, Deserialize)]
558    struct PositionExpiredMessageWithoutEconomics {
559        wallet_address: WalletAddress,
560        margin_mode: crate::MarginMode,
561        symbol: String,
562        position_size: Decimal,
563        settlement_price: Decimal,
564        settlement_value: Decimal,
565        timestamp: u64,
566    }
567
568    #[test]
569    fn position_expired_deserializes_missing_economics_as_none() {
570        let old = PositionExpiredMessageWithoutEconomics {
571            wallet_address: test_wallet(42),
572            margin_mode: crate::MarginMode::Standard,
573            symbol: "BTC-20250115-100000-C".to_string(),
574            position_size: dec!(1.25),
575            settlement_price: dec!(5000),
576            settlement_value: dec!(6250),
577            timestamp: 1736899200000,
578        };
579
580        let bytes = rmp_serde::to_vec_named(&old).expect("serialize payload");
581        let parsed: PositionExpiredMessage =
582            rmp_serde::from_slice(&bytes).expect("deserialize into current payload");
583
584        assert_eq!(parsed.wallet_address, old.wallet_address);
585        assert_eq!(parsed.margin_mode, old.margin_mode);
586        assert_eq!(parsed.symbol, old.symbol);
587        assert_eq!(parsed.position_size, old.position_size);
588        assert_eq!(parsed.settlement_price, old.settlement_price);
589        assert_eq!(parsed.settlement_value, old.settlement_value);
590        assert_eq!(parsed.timestamp, old.timestamp);
591        assert_eq!(parsed.settlement_entry_price, None);
592        assert_eq!(parsed.cost_basis, None);
593        assert_eq!(parsed.net_pnl, None);
594    }
595
596    #[test]
597    fn order_filled_wire_round_trip_preserves_accounting() {
598        let fill = Fill {
599            trade_id: 7,
600            taker_order_id: 11,
601            maker_order_id: 12,
602            symbol: "BTC-20261231-100000-C".to_string(),
603            price: dec!(250),
604            size: dec!(100000000),
605            taker_side: Side::Buy,
606            taker_wallet_address: test_wallet(1),
607            maker_wallet_address: test_wallet(2),
608            fee: Decimal::ZERO,
609            is_taker: true,
610            timestamp: 1_700_000_000_000,
611            builder_code_address: None,
612            builder_code_fee: None,
613            source: crate::FillSource::Orderbook,
614            taker_realized_pnl: Some(dec!(10)),
615            maker_realized_pnl: Some(dec!(-10)),
616            underlying_notional: Some(dec!(10000000)),
617        };
618        let accounting = FillAccounting {
619            trade_id: fill.trade_id,
620            taker_realized_pnl: dec!(10),
621            maker_realized_pnl: dec!(-10),
622            taker_premium_delta: dec!(-250),
623            maker_premium_delta: dec!(250),
624            taker_net_cash_delta: dec!(-240),
625            maker_net_cash_delta: dec!(240),
626        };
627        let message = EngineMessage::OrderFilled {
628            fill: fill.clone(),
629            accounting,
630        };
631
632        let wire = message.serialize_inner().expect("serialize order filled");
633        let parsed = EngineMessage::deserialize_from_wire(topics::TOPIC_FILLS, &wire)
634            .expect("deserialize order filled");
635
636        let EngineMessage::OrderFilled {
637            fill: parsed_fill,
638            accounting: parsed_accounting,
639        } = parsed
640        else {
641            panic!("expected OrderFilled");
642        };
643        assert_eq!(parsed_fill.trade_id, fill.trade_id);
644        assert_eq!(parsed_fill.taker_order_id, fill.taker_order_id);
645        assert_eq!(parsed_fill.maker_order_id, fill.maker_order_id);
646        assert_eq!(parsed_fill.symbol, fill.symbol);
647        assert_eq!(parsed_fill.price, fill.price);
648        assert_eq!(parsed_fill.size, fill.size);
649        assert_eq!(parsed_fill.taker_side, fill.taker_side);
650        assert_eq!(parsed_fill.taker_wallet_address, fill.taker_wallet_address);
651        assert_eq!(parsed_fill.maker_wallet_address, fill.maker_wallet_address);
652        assert_eq!(parsed_fill.taker_realized_pnl, fill.taker_realized_pnl);
653        assert_eq!(parsed_fill.maker_realized_pnl, fill.maker_realized_pnl);
654        assert_eq!(parsed_fill.underlying_notional, fill.underlying_notional);
655        assert_eq!(parsed_accounting, accounting);
656    }
657
658    #[test]
659    fn order_filled_deserializes_legacy_fill_payload() {
660        let fill = Fill {
661            trade_id: 8,
662            taker_order_id: 21,
663            maker_order_id: 22,
664            symbol: "ETH-PERP".to_string(),
665            price: dec!(3000),
666            size: dec!(1000000),
667            taker_side: Side::Sell,
668            taker_wallet_address: test_wallet(3),
669            maker_wallet_address: test_wallet(4),
670            fee: Decimal::ZERO,
671            is_taker: true,
672            timestamp: 1_700_000_000_001,
673            builder_code_address: None,
674            builder_code_fee: None,
675            source: crate::FillSource::Orderbook,
676            taker_realized_pnl: Some(dec!(25)),
677            maker_realized_pnl: Some(dec!(-25)),
678            underlying_notional: None,
679        };
680
681        let mut wire = Vec::new();
682        wire.push(WIRE_FORMAT_VERSION);
683        wire.extend(rmp_serde::to_vec_named(&fill).expect("serialize legacy fill"));
684
685        let parsed = EngineMessage::deserialize_from_wire(topics::TOPIC_FILLS, &wire)
686            .expect("deserialize legacy fill");
687        let EngineMessage::OrderFilled {
688            fill: parsed_fill,
689            accounting,
690        } = parsed
691        else {
692            panic!("expected OrderFilled");
693        };
694        assert_eq!(parsed_fill.trade_id, fill.trade_id);
695        assert_eq!(parsed_fill.taker_order_id, fill.taker_order_id);
696        assert_eq!(parsed_fill.maker_order_id, fill.maker_order_id);
697        assert_eq!(parsed_fill.symbol, fill.symbol);
698        assert_eq!(parsed_fill.price, fill.price);
699        assert_eq!(parsed_fill.size, fill.size);
700        assert_eq!(parsed_fill.taker_side, fill.taker_side);
701        assert_eq!(parsed_fill.taker_wallet_address, fill.taker_wallet_address);
702        assert_eq!(parsed_fill.maker_wallet_address, fill.maker_wallet_address);
703        assert_eq!(parsed_fill.taker_realized_pnl, fill.taker_realized_pnl);
704        assert_eq!(parsed_fill.maker_realized_pnl, fill.maker_realized_pnl);
705        assert_eq!(parsed_fill.underlying_notional, None);
706        assert_eq!(accounting, FillAccounting::from_fill(&fill));
707    }
708}