Skip to main content

hypercall/hypercore/
hydromancer_feed.rs

1//! Hydromancer WebSocket feed for real-time order updates, fills, and positions.
2//!
3//! Subscribes to `userOrderUpdates` and `userFills` for tracked accounts via
4//! Hydromancer's WebSocket API. Maintains current position state per account.
5//!
6//! On boot and reconnect: fetches `portfolioState` from Hydromancer API to seed
7//! position snapshots and catch missed fills. Uses the portfolio endpoint (not raw
8//! `clearinghouseState`) so that spot USDC balances are included in account equity
9//! for unified accounts.
10
11use std::collections::HashMap;
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14
15use futures::{SinkExt, StreamExt};
16use serde::{Deserialize, Serialize};
17use tokio::sync::{broadcast, RwLock};
18use tokio_tungstenite::tungstenite::Message;
19use tracing::{debug, error, info, warn};
20
21use hypercall_types::WalletAddress;
22
23// ---------------------------------------------------------------------------
24// Types
25// ---------------------------------------------------------------------------
26
27/// A perp position snapshot for one coin.
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct PerpPosition {
30    pub coin: String,
31    pub size: f64,
32    pub entry_price: Option<f64>,
33    pub unrealized_pnl: f64,
34    pub margin_used: f64,
35}
36
37/// Full account state: equity + positions.
38#[derive(Debug, Clone, Default)]
39pub struct AccountState {
40    pub account_value: f64,
41    pub total_margin_used: f64,
42    pub withdrawable: f64,
43    pub positions: Vec<PerpPosition>,
44    pub last_updated_ms: u64,
45}
46
47/// An order update event from Hydromancer WS.
48#[derive(Debug, Clone, Serialize, Deserialize)]
49#[serde(rename_all = "camelCase")]
50pub struct OrderUpdate {
51    pub coin: String,
52    pub oid: Option<u64>,
53    pub side: Option<String>,
54    pub sz: Option<String>,
55    pub limit_px: Option<String>,
56    pub order_type: Option<String>,
57    pub status: Option<String>,
58    pub filled_sz: Option<String>,
59    pub avg_px: Option<String>,
60    pub timestamp: Option<u64>,
61}
62
63/// A fill event from Hydromancer WS.
64#[derive(Debug, Clone, Serialize, Deserialize)]
65pub struct FillEvent {
66    pub coin: String,
67    pub side: String,
68    pub px: String,
69    pub sz: String,
70    pub time: u64,
71    pub oid: Option<u64>,
72    pub fee: Option<String>,
73}
74
75/// Events emitted by the feed service.
76#[derive(Debug, Clone)]
77pub enum HydromancerEvent {
78    PositionSnapshot {
79        account: WalletAddress,
80        state: AccountState,
81    },
82    OrderUpdate {
83        account: WalletAddress,
84        update: OrderUpdate,
85    },
86    Fill {
87        account: WalletAddress,
88        fill: FillEvent,
89    },
90}
91
92// ---------------------------------------------------------------------------
93// Service
94// ---------------------------------------------------------------------------
95
96pub struct HydromancerFeedService {
97    ws_url: String,
98    accounts: Arc<RwLock<Vec<WalletAddress>>>,
99    positions: Arc<RwLock<HashMap<WalletAddress, AccountState>>>,
100    event_tx: broadcast::Sender<HydromancerEvent>,
101    hl_info_url: String,
102    /// Hydromancer API URL for portfolioState queries (includes spot balances).
103    hydromancer_api_url: String,
104    /// Bearer token for Hydromancer API authentication.
105    hydromancer_api_key: Option<String>,
106}
107
108#[async_trait::async_trait]
109impl hypercall_api::state::DirectiveHydromancerFeed for HydromancerFeedService {
110    async fn add_account(&self, account: WalletAddress) {
111        self.add_account(account).await;
112    }
113
114    async fn wait_for_fills(
115        &self,
116        account: &WalletAddress,
117        timeout: Duration,
118    ) -> Vec<hypercall_api::state::DirectiveHydromancerFill> {
119        self.wait_for_fills(account, timeout)
120            .await
121            .into_iter()
122            .map(|fill| hypercall_api::state::DirectiveHydromancerFill {
123                coin: fill.coin,
124                side: fill.side,
125                px: fill.px,
126                sz: fill.sz,
127                time: fill.time,
128            })
129            .collect()
130    }
131}
132
133impl HydromancerFeedService {
134    pub fn new(
135        ws_url: String,
136        hl_info_url: String,
137        hydromancer_api_url: String,
138        hydromancer_api_key: Option<String>,
139        initial_accounts: Vec<WalletAddress>,
140    ) -> Self {
141        let (event_tx, _) = broadcast::channel(1024);
142        Self {
143            ws_url,
144            accounts: Arc::new(RwLock::new(initial_accounts)),
145            positions: Arc::new(RwLock::new(HashMap::new())),
146            event_tx,
147            hl_info_url,
148            hydromancer_api_url,
149            hydromancer_api_key,
150        }
151    }
152
153    pub fn subscribe(&self) -> broadcast::Receiver<HydromancerEvent> {
154        self.event_tx.subscribe()
155    }
156
157    pub async fn add_account(&self, account: WalletAddress) {
158        let mut accounts = self.accounts.write().await;
159        if !accounts.contains(&account) {
160            accounts.push(account);
161            info!(account = %account, "HydromancerFeed: added account");
162        }
163    }
164
165    /// Get current position state for an account. Returns None if not yet loaded.
166    pub async fn get_positions(&self, account: &WalletAddress) -> Option<AccountState> {
167        self.positions.read().await.get(account).cloned()
168    }
169
170    /// Get current position for a specific coin. Returns 0.0 if no position.
171    pub async fn get_position_size(&self, account: &WalletAddress, coin: &str) -> f64 {
172        self.positions
173            .read()
174            .await
175            .get(account)
176            .and_then(|state| state.positions.iter().find(|p| p.coin == coin))
177            .map(|p| p.size)
178            .unwrap_or(0.0)
179    }
180
181    /// Wait for a fill event for a specific account, with timeout.
182    pub async fn wait_for_fills(
183        &self,
184        account: &WalletAddress,
185        timeout: Duration,
186    ) -> Vec<FillEvent> {
187        let mut rx = self.event_tx.subscribe();
188        let deadline = Instant::now() + timeout;
189        let mut fills = Vec::new();
190
191        loop {
192            let remaining = deadline.saturating_duration_since(Instant::now());
193            if remaining.is_zero() {
194                break;
195            }
196
197            match tokio::time::timeout(remaining, rx.recv()).await {
198                Ok(Ok(HydromancerEvent::Fill { account: a, fill })) if a == *account => {
199                    fills.push(fill);
200                    tokio::time::sleep(Duration::from_millis(200)).await;
201                }
202                Ok(Ok(_)) => continue,
203                Ok(Err(_)) => break,
204                Err(_) => break,
205            }
206        }
207
208        fills
209    }
210
211    // -----------------------------------------------------------------------
212    // Lifecycle
213    // -----------------------------------------------------------------------
214
215    /// Start the feed service. Runs until shutdown.
216    pub async fn start(&self, mut shutdown_rx: broadcast::Receiver<()>) {
217        const BASE_BACKOFF: Duration = Duration::from_secs(1);
218        const MAX_BACKOFF: Duration = Duration::from_secs(30);
219        let mut consecutive_failures: u32 = 0;
220
221        loop {
222            let before = Instant::now();
223
224            // On every (re)connect: fetch full position snapshots + recent fills
225            self.reconcile_on_connect().await;
226
227            tokio::select! {
228                _ = shutdown_rx.recv() => {
229                    info!("HydromancerFeed: shutdown signal received");
230                    return;
231                }
232                _ = self.run_connection() => {
233                    if before.elapsed() > Duration::from_secs(60) {
234                        consecutive_failures = 0;
235                    }
236                    consecutive_failures += 1;
237                    let backoff = std::cmp::min(
238                        BASE_BACKOFF * 2u32.saturating_pow(consecutive_failures - 1),
239                        MAX_BACKOFF,
240                    );
241                    warn!(
242                        failures = consecutive_failures,
243                        backoff_ms = backoff.as_millis() as u64,
244                        "HydromancerFeed: connection lost, reconnecting"
245                    );
246                    tokio::select! {
247                        _ = shutdown_rx.recv() => {
248                            info!("HydromancerFeed: shutdown during backoff");
249                            return;
250                        }
251                        _ = tokio::time::sleep(backoff) => {}
252                    }
253                }
254            }
255        }
256    }
257
258    // -----------------------------------------------------------------------
259    // REST reconciliation (boot + reconnect)
260    // -----------------------------------------------------------------------
261
262    async fn reconcile_on_connect(&self) {
263        let accts = self.accounts.read().await.clone();
264        if accts.is_empty() {
265            return;
266        }
267
268        let client = match reqwest::Client::builder()
269            .timeout(Duration::from_secs(10))
270            .build()
271        {
272            Ok(c) => c,
273            Err(e) => {
274                error!("HydromancerFeed: failed to create HTTP client: {}", e);
275                return;
276            }
277        };
278
279        let now_ms = crate::shared::order_types::get_timestamp_millis();
280
281        for account in &accts {
282            // 1. Fetch portfolioState (perp + spot) via Hydromancer API
283            let body = serde_json::json!({
284                "type": "portfolioState",
285                "user": format!("{}", account),
286            });
287
288            let mut request = client.post(&self.hydromancer_api_url).json(&body);
289            if let Some(ref key) = self.hydromancer_api_key {
290                request = request.header("Authorization", format!("Bearer {}", key));
291            }
292
293            match request.send().await {
294                Ok(resp) => {
295                    if let Ok(state_json) = resp.json::<serde_json::Value>().await {
296                        let account_state = parse_portfolio_state(&state_json, now_ms);
297                        self.positions
298                            .write()
299                            .await
300                            .insert(*account, account_state.clone());
301                        let _ = self.event_tx.send(HydromancerEvent::PositionSnapshot {
302                            account: *account,
303                            state: account_state,
304                        });
305                    }
306                }
307                Err(e) => {
308                    warn!(account = %account, "HydromancerFeed: portfolioState fetch failed: {}", e);
309                }
310            }
311
312            // 2. Fetch recent fills (last 60s)
313            let fills_body = serde_json::json!({
314                "type": "userFillsByTime",
315                "user": format!("{}", account),
316                "startTime": now_ms.saturating_sub(60_000),
317            });
318
319            if let Ok(resp) = client
320                .post(&self.hl_info_url)
321                .json(&fills_body)
322                .send()
323                .await
324            {
325                if let Ok(fills_json) = resp.json::<Vec<serde_json::Value>>().await {
326                    for fill_val in fills_json {
327                        if let Ok(fill) = serde_json::from_value::<FillEvent>(fill_val) {
328                            let _ = self.event_tx.send(HydromancerEvent::Fill {
329                                account: *account,
330                                fill,
331                            });
332                        }
333                    }
334                }
335            }
336        }
337
338        info!(
339            accounts = accts.len(),
340            "HydromancerFeed: reconciliation complete"
341        );
342    }
343
344    // -----------------------------------------------------------------------
345    // WebSocket connection
346    // -----------------------------------------------------------------------
347
348    async fn run_connection(&self) {
349        let redacted_url = self.ws_url.split("?token=").next().unwrap_or(&self.ws_url);
350        info!(url = %redacted_url, "HydromancerFeed: connecting");
351
352        let (ws_stream, _) = match tokio_tungstenite::connect_async(&self.ws_url).await {
353            Ok(conn) => {
354                info!("HydromancerFeed: connected");
355                conn
356            }
357            Err(e) => {
358                error!("HydromancerFeed: connection failed: {}", e);
359                return;
360            }
361        };
362
363        let (mut write, mut read) = ws_stream.split();
364
365        // Read the "connected" message
366        match tokio::time::timeout(Duration::from_secs(5), read.next()).await {
367            Ok(Some(Ok(Message::Text(text)))) => {
368                debug!("HydromancerFeed: {}", text);
369            }
370            _ => {
371                error!("HydromancerFeed: no connected message received");
372                return;
373            }
374        }
375
376        // Subscribe for all tracked accounts
377        let accts = self.accounts.read().await;
378        let addresses: Vec<String> = accts.iter().map(|a| format!("{}", a)).collect();
379        drop(accts);
380
381        if addresses.is_empty() {
382            warn!("HydromancerFeed: no accounts to subscribe to");
383        }
384
385        for sub_type in &["userOrderUpdates", "userFills"] {
386            let sub = serde_json::json!({
387                "method": "subscribe",
388                "subscription": {
389                    "type": sub_type,
390                    "addresses": addresses,
391                }
392            });
393            let msg = serde_json::to_string(&sub).expect("serialize subscription");
394            if let Err(e) = write.send(Message::Text(msg)).await {
395                error!(
396                    sub_type = sub_type,
397                    "HydromancerFeed: subscribe failed: {}", e
398                );
399                return;
400            }
401        }
402
403        // Read subscription confirmations
404        for _ in 0..2 {
405            match tokio::time::timeout(Duration::from_secs(5), read.next()).await {
406                Ok(Some(Ok(Message::Text(text)))) => {
407                    debug!("HydromancerFeed: sub response: {}", text);
408                }
409                _ => {
410                    warn!("HydromancerFeed: subscription confirmation timeout");
411                }
412            }
413        }
414
415        info!(
416            accounts = addresses.len(),
417            "HydromancerFeed: subscribed to userOrderUpdates + userFills"
418        );
419
420        // Event loop
421        const WS_READ_TIMEOUT: Duration = Duration::from_secs(60);
422
423        loop {
424            match tokio::time::timeout(WS_READ_TIMEOUT, read.next()).await {
425                Ok(Some(Ok(Message::Text(text)))) => {
426                    self.handle_message(&text);
427                }
428                Ok(Some(Ok(Message::Ping(data)))) => {
429                    if let Err(e) = write.send(Message::Pong(data)).await {
430                        error!("HydromancerFeed: pong failed: {}", e);
431                        return;
432                    }
433                }
434                Ok(Some(Ok(Message::Close(_)))) => {
435                    info!("HydromancerFeed: server closed connection");
436                    return;
437                }
438                Ok(Some(Err(e))) => {
439                    error!("HydromancerFeed: read error: {}", e);
440                    return;
441                }
442                Ok(None) => {
443                    info!("HydromancerFeed: stream ended");
444                    return;
445                }
446                Err(_) => {
447                    warn!(
448                        "HydromancerFeed: read timeout ({}s), reconnecting",
449                        WS_READ_TIMEOUT.as_secs()
450                    );
451                    return;
452                }
453                _ => {}
454            }
455        }
456    }
457
458    fn handle_message(&self, text: &str) {
459        let msg: serde_json::Value = match serde_json::from_str(text) {
460            Ok(v) => v,
461            Err(e) => {
462                warn!("HydromancerFeed: invalid JSON: {}", e);
463                return;
464            }
465        };
466
467        let channel = msg.get("channel").and_then(|v| v.as_str()).unwrap_or("");
468        let data = match msg.get("data") {
469            Some(d) => d,
470            None => return,
471        };
472
473        if channel.starts_with("userFills") {
474            self.handle_fills(data);
475        } else if channel.starts_with("userOrderUpdates") {
476            self.handle_order_updates(data);
477        }
478    }
479
480    fn handle_fills(&self, data: &serde_json::Value) {
481        let fills = match data.as_array() {
482            Some(a) => a,
483            None => return,
484        };
485
486        for fill_val in fills {
487            let account_str = fill_val.get("user").and_then(|v| v.as_str()).unwrap_or("");
488            let account = match account_str.parse::<WalletAddress>() {
489                Ok(a) => a,
490                Err(_) => continue,
491            };
492            let fill: FillEvent = match serde_json::from_value(fill_val.clone()) {
493                Ok(f) => f,
494                Err(_) => continue,
495            };
496
497            // Update position based on fill
498            self.apply_fill_to_positions(&account, &fill);
499
500            let _ = self.event_tx.send(HydromancerEvent::Fill { account, fill });
501        }
502    }
503
504    fn handle_order_updates(&self, data: &serde_json::Value) {
505        let updates = match data.as_array() {
506            Some(a) => a,
507            None => return,
508        };
509
510        for update_val in updates {
511            let account_str = update_val
512                .get("user")
513                .and_then(|v| v.as_str())
514                .unwrap_or("");
515            let account = match account_str.parse::<WalletAddress>() {
516                Ok(a) => a,
517                Err(_) => continue,
518            };
519            let update: OrderUpdate = match serde_json::from_value(update_val.clone()) {
520                Ok(u) => u,
521                Err(_) => continue,
522            };
523            let _ = self
524                .event_tx
525                .send(HydromancerEvent::OrderUpdate { account, update });
526        }
527    }
528
529    /// Apply a fill to the in-memory position state.
530    /// This is a best-effort delta update — the full snapshot from REST is the source of truth.
531    fn apply_fill_to_positions(&self, account: &WalletAddress, fill: &FillEvent) {
532        let size: f64 = fill.sz.parse().unwrap_or(0.0);
533        let signed_size = if fill.side == "B" { size } else { -size };
534
535        // Try to update synchronously via try_write to avoid blocking the WS read loop.
536        // If the lock is held (reconciliation running), skip — the next reconciliation
537        // will correct the state anyway.
538        if let Ok(mut positions) = self.positions.try_write() {
539            let state = positions.entry(*account).or_default();
540            if let Some(pos) = state.positions.iter_mut().find(|p| p.coin == fill.coin) {
541                pos.size += signed_size;
542            } else {
543                state.positions.push(PerpPosition {
544                    coin: fill.coin.clone(),
545                    size: signed_size,
546                    entry_price: fill.px.parse().ok(),
547                    unrealized_pnl: 0.0,
548                    margin_used: 0.0,
549                });
550            }
551            state.last_updated_ms = fill.time;
552        }
553    }
554}
555
556// ---------------------------------------------------------------------------
557// Helpers
558// ---------------------------------------------------------------------------
559
560/// Parse `portfolioState` response which wraps data under `clearinghouseState` and
561/// `spotClearinghouseState` keys. Computes unified account value as
562/// `perp_account_value + spot_usdc_balance`.
563fn parse_portfolio_state(json: &serde_json::Value, now_ms: u64) -> AccountState {
564    // The portfolioState response nests perp data under `clearinghouseState`.
565    // Fall back to top-level keys for backward compatibility with raw clearinghouseState.
566    let ch = json.get("clearinghouseState").unwrap_or(json);
567
568    let ms = ch.get("marginSummary").unwrap_or(ch);
569
570    let perp_account_value = ms
571        .get("accountValue")
572        .and_then(|v| v.as_str())
573        .and_then(|s| s.parse::<f64>().ok())
574        .unwrap_or(0.0);
575
576    let total_margin_used = ms
577        .get("totalMarginUsed")
578        .and_then(|v| v.as_str())
579        .and_then(|s| s.parse::<f64>().ok())
580        .unwrap_or(0.0);
581
582    let withdrawable = ch
583        .get("withdrawable")
584        .and_then(|v| v.as_str())
585        .and_then(|s| s.parse::<f64>().ok())
586        .unwrap_or(0.0);
587
588    let positions = ch
589        .get("assetPositions")
590        .and_then(|v| v.as_array())
591        .map(|arr| {
592            arr.iter()
593                .filter_map(|ap| {
594                    let pos = ap.get("position")?;
595                    let coin = pos.get("coin")?.as_str()?.to_string();
596                    let size = pos.get("szi")?.as_str()?.parse::<f64>().ok()?;
597                    Some(PerpPosition {
598                        coin,
599                        size,
600                        entry_price: pos
601                            .get("entryPx")
602                            .and_then(|v| v.as_str())
603                            .and_then(|s| s.parse().ok()),
604                        unrealized_pnl: pos
605                            .get("unrealizedPnl")
606                            .and_then(|v| v.as_str())
607                            .and_then(|s| s.parse().ok())
608                            .unwrap_or(0.0),
609                        margin_used: pos
610                            .get("marginUsed")
611                            .and_then(|v| v.as_str())
612                            .and_then(|s| s.parse().ok())
613                            .unwrap_or(0.0),
614                    })
615                })
616                .collect()
617        })
618        .unwrap_or_default();
619
620    // Extract spot USDC balance from spotClearinghouseState.balances
621    let spot_usdc = json
622        .get("spotClearinghouseState")
623        .and_then(|spot| spot.get("balances"))
624        .and_then(|b| b.as_array())
625        .and_then(|balances| {
626            balances.iter().find_map(|entry| {
627                let coin = entry.get("coin")?.as_str()?;
628                if coin == "USDC" {
629                    entry
630                        .get("total")
631                        .and_then(|v| v.as_str())
632                        .and_then(|s| s.parse::<f64>().ok())
633                } else {
634                    None
635                }
636            })
637        })
638        .unwrap_or(0.0);
639
640    let account_value = perp_account_value + spot_usdc;
641
642    AccountState {
643        account_value,
644        total_margin_used,
645        withdrawable,
646        positions,
647        last_updated_ms: now_ms,
648    }
649}
650
651#[cfg(test)]
652mod tests {
653    use super::*;
654    use std::str::FromStr;
655
656    fn test_account() -> WalletAddress {
657        WalletAddress::from_str("0x72471e6cb5be1b2db2eb00c38b7bca66c8c310c5").unwrap()
658    }
659
660    #[test]
661    fn test_parse_portfolio_state() {
662        let json: serde_json::Value = serde_json::from_str(
663            r#"{
664                "clearinghouseState": {
665                    "marginSummary": {
666                        "accountValue": "350.0",
667                        "totalNtlPos": "3100.0",
668                        "totalRawUsd": "350.0",
669                        "totalMarginUsed": "77.5"
670                    },
671                    "withdrawable": "272.5",
672                    "assetPositions": [
673                        {
674                            "position": {
675                                "coin": "BTC",
676                                "szi": "-0.04",
677                                "entryPx": "77610.5",
678                                "positionValue": "3100.0",
679                                "unrealizedPnl": "-20.0",
680                                "marginUsed": "77.5",
681                                "returnOnEquity": "-0.05",
682                                "liquidationPx": null,
683                                "leverage": {"type": "cross", "value": 40}
684                            },
685                            "type": "oneWay"
686                        }
687                    ],
688                    "time": 1776489498654
689                },
690                "spotClearinghouseState": {
691                    "balances": [
692                        {"coin": "USDC", "total": "500.0", "hold": "0.0"},
693                        {"coin": "HYPE", "total": "10.0", "hold": "0.0"}
694                    ]
695                }
696            }"#,
697        )
698        .unwrap();
699
700        let state = parse_portfolio_state(&json, 1000);
701        // account_value = perp(350.0) + spot_usdc(500.0) = 850.0
702        assert!((state.account_value - 850.0).abs() < 0.01);
703        assert!((state.total_margin_used - 77.5).abs() < 0.01);
704        assert!((state.withdrawable - 272.5).abs() < 0.01);
705        assert_eq!(state.positions.len(), 1);
706        assert_eq!(state.positions[0].coin, "BTC");
707        assert!((state.positions[0].size - (-0.04)).abs() < 0.0001);
708        assert_eq!(state.positions[0].entry_price, Some(77610.5));
709    }
710
711    #[test]
712    fn test_parse_portfolio_state_no_spot() {
713        // portfolioState response with no spotClearinghouseState -- spot USDC defaults to 0
714        let json: serde_json::Value = serde_json::from_str(
715            r#"{
716                "clearinghouseState": {
717                    "marginSummary": {"accountValue": "100.0", "totalNtlPos": "0.0", "totalRawUsd": "100.0", "totalMarginUsed": "0.0"},
718                    "withdrawable": "100.0",
719                    "assetPositions": []
720                }
721            }"#,
722        )
723        .unwrap();
724
725        let state = parse_portfolio_state(&json, 0);
726        assert!((state.account_value - 100.0).abs() < 0.01);
727        assert!(state.positions.is_empty());
728    }
729
730    #[test]
731    fn test_parse_portfolio_state_empty() {
732        let json: serde_json::Value = serde_json::from_str(
733            r#"{
734                "clearinghouseState": {
735                    "marginSummary": {"accountValue": "0.0", "totalNtlPos": "0.0", "totalRawUsd": "0.0", "totalMarginUsed": "0.0"},
736                    "withdrawable": "0.0",
737                    "assetPositions": []
738                },
739                "spotClearinghouseState": {
740                    "balances": []
741                }
742            }"#,
743        )
744        .unwrap();
745
746        let state = parse_portfolio_state(&json, 0);
747        assert_eq!(state.account_value, 0.0);
748        assert!(state.positions.is_empty());
749    }
750
751    #[test]
752    fn test_parse_portfolio_state_backward_compat() {
753        // Backward compat: raw clearinghouseState format (no wrapper) still works
754        let json: serde_json::Value = serde_json::from_str(
755            r#"{
756                "marginSummary": {"accountValue": "200.0", "totalNtlPos": "0.0", "totalRawUsd": "200.0", "totalMarginUsed": "50.0"},
757                "withdrawable": "150.0",
758                "assetPositions": []
759            }"#,
760        )
761        .unwrap();
762
763        let state = parse_portfolio_state(&json, 0);
764        assert!((state.account_value - 200.0).abs() < 0.01);
765        assert!((state.total_margin_used - 50.0).abs() < 0.01);
766    }
767
768    #[test]
769    fn test_fill_event_serde() {
770        let json = r#"{"coin":"BTC","side":"B","px":"78734.0","sz":"0.01","time":1776489498654}"#;
771        let fill: FillEvent = serde_json::from_str(json).unwrap();
772        assert_eq!(fill.coin, "BTC");
773        assert_eq!(fill.side, "B");
774        assert_eq!(fill.sz, "0.01");
775        assert_eq!(fill.time, 1776489498654);
776    }
777
778    #[test]
779    fn test_order_update_serde() {
780        let json = r#"{"coin":"BTC","oid":12345,"side":"B","sz":"0.01","limitPx":"80000","status":"filled"}"#;
781        let update: OrderUpdate = serde_json::from_str(json).unwrap();
782        assert_eq!(update.coin, "BTC");
783        assert_eq!(update.oid, Some(12345));
784        assert_eq!(update.status.as_deref(), Some("filled"));
785    }
786
787    #[tokio::test]
788    async fn test_service_add_account() {
789        let svc = HydromancerFeedService::new(
790            "ws://fake".into(),
791            "http://fake".into(),
792            "http://fake-hydromancer".into(),
793            None,
794            vec![],
795        );
796        let account = test_account();
797
798        svc.add_account(account).await;
799        svc.add_account(account).await; // duplicate, should be no-op
800
801        let accounts = svc.accounts.read().await;
802        assert_eq!(accounts.len(), 1);
803    }
804
805    #[tokio::test]
806    async fn test_service_get_positions_empty() {
807        let svc = HydromancerFeedService::new(
808            "ws://fake".into(),
809            "http://fake".into(),
810            "http://fake-hydromancer".into(),
811            None,
812            vec![],
813        );
814        let account = test_account();
815        assert!(svc.get_positions(&account).await.is_none());
816        assert_eq!(svc.get_position_size(&account, "BTC").await, 0.0);
817    }
818
819    #[tokio::test]
820    async fn test_service_apply_fill_to_positions() {
821        let svc = HydromancerFeedService::new(
822            "ws://fake".into(),
823            "http://fake".into(),
824            "http://fake-hydromancer".into(),
825            None,
826            vec![],
827        );
828        let account = test_account();
829
830        // Seed empty state
831        svc.positions
832            .write()
833            .await
834            .insert(account, AccountState::default());
835
836        // Apply a buy fill
837        let fill = FillEvent {
838            coin: "BTC".into(),
839            side: "B".into(),
840            px: "77000".into(),
841            sz: "0.01".into(),
842            time: 1000,
843            oid: None,
844            fee: None,
845        };
846        svc.apply_fill_to_positions(&account, &fill);
847
848        assert!((svc.get_position_size(&account, "BTC").await - 0.01).abs() < 0.0001);
849
850        // Apply a sell fill
851        let fill2 = FillEvent {
852            coin: "BTC".into(),
853            side: "S".into(),
854            px: "78000".into(),
855            sz: "0.005".into(),
856            time: 2000,
857            oid: None,
858            fee: None,
859        };
860        svc.apply_fill_to_positions(&account, &fill2);
861
862        assert!((svc.get_position_size(&account, "BTC").await - 0.005).abs() < 0.0001);
863    }
864
865    #[tokio::test]
866    async fn test_service_wait_for_fills_timeout() {
867        let svc = HydromancerFeedService::new(
868            "ws://fake".into(),
869            "http://fake".into(),
870            "http://fake-hydromancer".into(),
871            None,
872            vec![],
873        );
874        let account = test_account();
875
876        let fills = svc
877            .wait_for_fills(&account, Duration::from_millis(100))
878            .await;
879        assert!(fills.is_empty());
880    }
881
882    #[tokio::test]
883    async fn test_service_wait_for_fills_receives() {
884        let svc = Arc::new(HydromancerFeedService::new(
885            "ws://fake".into(),
886            "http://fake".into(),
887            "http://fake-hydromancer".into(),
888            None,
889            vec![],
890        ));
891        let account = test_account();
892
893        let svc_clone = svc.clone();
894        let account_clone = account;
895
896        // Spawn a task that sends a fill after 50ms
897        tokio::spawn(async move {
898            tokio::time::sleep(Duration::from_millis(50)).await;
899            let _ = svc_clone.event_tx.send(HydromancerEvent::Fill {
900                account: account_clone,
901                fill: FillEvent {
902                    coin: "BTC".into(),
903                    side: "B".into(),
904                    px: "77000".into(),
905                    sz: "0.01".into(),
906                    time: 1000,
907                    oid: None,
908                    fee: None,
909                },
910            });
911        });
912
913        let fills = svc.wait_for_fills(&account, Duration::from_secs(1)).await;
914        assert_eq!(fills.len(), 1);
915        assert_eq!(fills[0].coin, "BTC");
916        assert_eq!(fills[0].sz, "0.01");
917    }
918
919    #[tokio::test]
920    async fn test_service_wait_ignores_other_accounts() {
921        let svc = Arc::new(HydromancerFeedService::new(
922            "ws://fake".into(),
923            "http://fake".into(),
924            "http://fake-hydromancer".into(),
925            None,
926            vec![],
927        ));
928        let account = test_account();
929        let other = WalletAddress::from_str("0x0000000000000000000000000000000000000001").unwrap();
930
931        let svc_clone = svc.clone();
932        tokio::spawn(async move {
933            tokio::time::sleep(Duration::from_millis(50)).await;
934            let _ = svc_clone.event_tx.send(HydromancerEvent::Fill {
935                account: other,
936                fill: FillEvent {
937                    coin: "ETH".into(),
938                    side: "S".into(),
939                    px: "2500".into(),
940                    sz: "1.0".into(),
941                    time: 1000,
942                    oid: None,
943                    fee: None,
944                },
945            });
946        });
947
948        let fills = svc
949            .wait_for_fills(&account, Duration::from_millis(200))
950            .await;
951        assert!(fills.is_empty());
952    }
953}