1use std::collections::HashMap;
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14
15use futures::{SinkExt, StreamExt};
16use serde::{Deserialize, Serialize};
17use tokio::sync::{broadcast, RwLock};
18use tokio_tungstenite::tungstenite::Message;
19use tracing::{debug, error, info, warn};
20
21use hypercall_types::WalletAddress;
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct PerpPosition {
30 pub coin: String,
31 pub size: f64,
32 pub entry_price: Option<f64>,
33 pub unrealized_pnl: f64,
34 pub margin_used: f64,
35}
36
37#[derive(Debug, Clone, Default)]
39pub struct AccountState {
40 pub account_value: f64,
41 pub total_margin_used: f64,
42 pub withdrawable: f64,
43 pub positions: Vec<PerpPosition>,
44 pub last_updated_ms: u64,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49#[serde(rename_all = "camelCase")]
50pub struct OrderUpdate {
51 pub coin: String,
52 pub oid: Option<u64>,
53 pub side: Option<String>,
54 pub sz: Option<String>,
55 pub limit_px: Option<String>,
56 pub order_type: Option<String>,
57 pub status: Option<String>,
58 pub filled_sz: Option<String>,
59 pub avg_px: Option<String>,
60 pub timestamp: Option<u64>,
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
65pub struct FillEvent {
66 pub coin: String,
67 pub side: String,
68 pub px: String,
69 pub sz: String,
70 pub time: u64,
71 pub oid: Option<u64>,
72 pub fee: Option<String>,
73}
74
75#[derive(Debug, Clone)]
77pub enum HydromancerEvent {
78 PositionSnapshot {
79 account: WalletAddress,
80 state: AccountState,
81 },
82 OrderUpdate {
83 account: WalletAddress,
84 update: OrderUpdate,
85 },
86 Fill {
87 account: WalletAddress,
88 fill: FillEvent,
89 },
90}
91
92pub struct HydromancerFeedService {
97 ws_url: String,
98 accounts: Arc<RwLock<Vec<WalletAddress>>>,
99 positions: Arc<RwLock<HashMap<WalletAddress, AccountState>>>,
100 event_tx: broadcast::Sender<HydromancerEvent>,
101 hl_info_url: String,
102 hydromancer_api_url: String,
104 hydromancer_api_key: Option<String>,
106}
107
108#[async_trait::async_trait]
109impl hypercall_api::state::DirectiveHydromancerFeed for HydromancerFeedService {
110 async fn add_account(&self, account: WalletAddress) {
111 self.add_account(account).await;
112 }
113
114 async fn wait_for_fills(
115 &self,
116 account: &WalletAddress,
117 timeout: Duration,
118 ) -> Vec<hypercall_api::state::DirectiveHydromancerFill> {
119 self.wait_for_fills(account, timeout)
120 .await
121 .into_iter()
122 .map(|fill| hypercall_api::state::DirectiveHydromancerFill {
123 coin: fill.coin,
124 side: fill.side,
125 px: fill.px,
126 sz: fill.sz,
127 time: fill.time,
128 })
129 .collect()
130 }
131}
132
133impl HydromancerFeedService {
134 pub fn new(
135 ws_url: String,
136 hl_info_url: String,
137 hydromancer_api_url: String,
138 hydromancer_api_key: Option<String>,
139 initial_accounts: Vec<WalletAddress>,
140 ) -> Self {
141 let (event_tx, _) = broadcast::channel(1024);
142 Self {
143 ws_url,
144 accounts: Arc::new(RwLock::new(initial_accounts)),
145 positions: Arc::new(RwLock::new(HashMap::new())),
146 event_tx,
147 hl_info_url,
148 hydromancer_api_url,
149 hydromancer_api_key,
150 }
151 }
152
153 pub fn subscribe(&self) -> broadcast::Receiver<HydromancerEvent> {
154 self.event_tx.subscribe()
155 }
156
157 pub async fn add_account(&self, account: WalletAddress) {
158 let mut accounts = self.accounts.write().await;
159 if !accounts.contains(&account) {
160 accounts.push(account);
161 info!(account = %account, "HydromancerFeed: added account");
162 }
163 }
164
165 pub async fn get_positions(&self, account: &WalletAddress) -> Option<AccountState> {
167 self.positions.read().await.get(account).cloned()
168 }
169
170 pub async fn get_position_size(&self, account: &WalletAddress, coin: &str) -> f64 {
172 self.positions
173 .read()
174 .await
175 .get(account)
176 .and_then(|state| state.positions.iter().find(|p| p.coin == coin))
177 .map(|p| p.size)
178 .unwrap_or(0.0)
179 }
180
181 pub async fn wait_for_fills(
183 &self,
184 account: &WalletAddress,
185 timeout: Duration,
186 ) -> Vec<FillEvent> {
187 let mut rx = self.event_tx.subscribe();
188 let deadline = Instant::now() + timeout;
189 let mut fills = Vec::new();
190
191 loop {
192 let remaining = deadline.saturating_duration_since(Instant::now());
193 if remaining.is_zero() {
194 break;
195 }
196
197 match tokio::time::timeout(remaining, rx.recv()).await {
198 Ok(Ok(HydromancerEvent::Fill { account: a, fill })) if a == *account => {
199 fills.push(fill);
200 tokio::time::sleep(Duration::from_millis(200)).await;
201 }
202 Ok(Ok(_)) => continue,
203 Ok(Err(_)) => break,
204 Err(_) => break,
205 }
206 }
207
208 fills
209 }
210
211 pub async fn start(&self, mut shutdown_rx: broadcast::Receiver<()>) {
217 const BASE_BACKOFF: Duration = Duration::from_secs(1);
218 const MAX_BACKOFF: Duration = Duration::from_secs(30);
219 let mut consecutive_failures: u32 = 0;
220
221 loop {
222 let before = Instant::now();
223
224 self.reconcile_on_connect().await;
226
227 tokio::select! {
228 _ = shutdown_rx.recv() => {
229 info!("HydromancerFeed: shutdown signal received");
230 return;
231 }
232 _ = self.run_connection() => {
233 if before.elapsed() > Duration::from_secs(60) {
234 consecutive_failures = 0;
235 }
236 consecutive_failures += 1;
237 let backoff = std::cmp::min(
238 BASE_BACKOFF * 2u32.saturating_pow(consecutive_failures - 1),
239 MAX_BACKOFF,
240 );
241 warn!(
242 failures = consecutive_failures,
243 backoff_ms = backoff.as_millis() as u64,
244 "HydromancerFeed: connection lost, reconnecting"
245 );
246 tokio::select! {
247 _ = shutdown_rx.recv() => {
248 info!("HydromancerFeed: shutdown during backoff");
249 return;
250 }
251 _ = tokio::time::sleep(backoff) => {}
252 }
253 }
254 }
255 }
256 }
257
258 async fn reconcile_on_connect(&self) {
263 let accts = self.accounts.read().await.clone();
264 if accts.is_empty() {
265 return;
266 }
267
268 let client = match reqwest::Client::builder()
269 .timeout(Duration::from_secs(10))
270 .build()
271 {
272 Ok(c) => c,
273 Err(e) => {
274 error!("HydromancerFeed: failed to create HTTP client: {}", e);
275 return;
276 }
277 };
278
279 let now_ms = crate::shared::order_types::get_timestamp_millis();
280
281 for account in &accts {
282 let body = serde_json::json!({
284 "type": "portfolioState",
285 "user": format!("{}", account),
286 });
287
288 let mut request = client.post(&self.hydromancer_api_url).json(&body);
289 if let Some(ref key) = self.hydromancer_api_key {
290 request = request.header("Authorization", format!("Bearer {}", key));
291 }
292
293 match request.send().await {
294 Ok(resp) => {
295 if let Ok(state_json) = resp.json::<serde_json::Value>().await {
296 let account_state = parse_portfolio_state(&state_json, now_ms);
297 self.positions
298 .write()
299 .await
300 .insert(*account, account_state.clone());
301 let _ = self.event_tx.send(HydromancerEvent::PositionSnapshot {
302 account: *account,
303 state: account_state,
304 });
305 }
306 }
307 Err(e) => {
308 warn!(account = %account, "HydromancerFeed: portfolioState fetch failed: {}", e);
309 }
310 }
311
312 let fills_body = serde_json::json!({
314 "type": "userFillsByTime",
315 "user": format!("{}", account),
316 "startTime": now_ms.saturating_sub(60_000),
317 });
318
319 if let Ok(resp) = client
320 .post(&self.hl_info_url)
321 .json(&fills_body)
322 .send()
323 .await
324 {
325 if let Ok(fills_json) = resp.json::<Vec<serde_json::Value>>().await {
326 for fill_val in fills_json {
327 if let Ok(fill) = serde_json::from_value::<FillEvent>(fill_val) {
328 let _ = self.event_tx.send(HydromancerEvent::Fill {
329 account: *account,
330 fill,
331 });
332 }
333 }
334 }
335 }
336 }
337
338 info!(
339 accounts = accts.len(),
340 "HydromancerFeed: reconciliation complete"
341 );
342 }
343
344 async fn run_connection(&self) {
349 let redacted_url = self.ws_url.split("?token=").next().unwrap_or(&self.ws_url);
350 info!(url = %redacted_url, "HydromancerFeed: connecting");
351
352 let (ws_stream, _) = match tokio_tungstenite::connect_async(&self.ws_url).await {
353 Ok(conn) => {
354 info!("HydromancerFeed: connected");
355 conn
356 }
357 Err(e) => {
358 error!("HydromancerFeed: connection failed: {}", e);
359 return;
360 }
361 };
362
363 let (mut write, mut read) = ws_stream.split();
364
365 match tokio::time::timeout(Duration::from_secs(5), read.next()).await {
367 Ok(Some(Ok(Message::Text(text)))) => {
368 debug!("HydromancerFeed: {}", text);
369 }
370 _ => {
371 error!("HydromancerFeed: no connected message received");
372 return;
373 }
374 }
375
376 let accts = self.accounts.read().await;
378 let addresses: Vec<String> = accts.iter().map(|a| format!("{}", a)).collect();
379 drop(accts);
380
381 if addresses.is_empty() {
382 warn!("HydromancerFeed: no accounts to subscribe to");
383 }
384
385 for sub_type in &["userOrderUpdates", "userFills"] {
386 let sub = serde_json::json!({
387 "method": "subscribe",
388 "subscription": {
389 "type": sub_type,
390 "addresses": addresses,
391 }
392 });
393 let msg = serde_json::to_string(&sub).expect("serialize subscription");
394 if let Err(e) = write.send(Message::Text(msg)).await {
395 error!(
396 sub_type = sub_type,
397 "HydromancerFeed: subscribe failed: {}", e
398 );
399 return;
400 }
401 }
402
403 for _ in 0..2 {
405 match tokio::time::timeout(Duration::from_secs(5), read.next()).await {
406 Ok(Some(Ok(Message::Text(text)))) => {
407 debug!("HydromancerFeed: sub response: {}", text);
408 }
409 _ => {
410 warn!("HydromancerFeed: subscription confirmation timeout");
411 }
412 }
413 }
414
415 info!(
416 accounts = addresses.len(),
417 "HydromancerFeed: subscribed to userOrderUpdates + userFills"
418 );
419
420 const WS_READ_TIMEOUT: Duration = Duration::from_secs(60);
422
423 loop {
424 match tokio::time::timeout(WS_READ_TIMEOUT, read.next()).await {
425 Ok(Some(Ok(Message::Text(text)))) => {
426 self.handle_message(&text);
427 }
428 Ok(Some(Ok(Message::Ping(data)))) => {
429 if let Err(e) = write.send(Message::Pong(data)).await {
430 error!("HydromancerFeed: pong failed: {}", e);
431 return;
432 }
433 }
434 Ok(Some(Ok(Message::Close(_)))) => {
435 info!("HydromancerFeed: server closed connection");
436 return;
437 }
438 Ok(Some(Err(e))) => {
439 error!("HydromancerFeed: read error: {}", e);
440 return;
441 }
442 Ok(None) => {
443 info!("HydromancerFeed: stream ended");
444 return;
445 }
446 Err(_) => {
447 warn!(
448 "HydromancerFeed: read timeout ({}s), reconnecting",
449 WS_READ_TIMEOUT.as_secs()
450 );
451 return;
452 }
453 _ => {}
454 }
455 }
456 }
457
458 fn handle_message(&self, text: &str) {
459 let msg: serde_json::Value = match serde_json::from_str(text) {
460 Ok(v) => v,
461 Err(e) => {
462 warn!("HydromancerFeed: invalid JSON: {}", e);
463 return;
464 }
465 };
466
467 let channel = msg.get("channel").and_then(|v| v.as_str()).unwrap_or("");
468 let data = match msg.get("data") {
469 Some(d) => d,
470 None => return,
471 };
472
473 if channel.starts_with("userFills") {
474 self.handle_fills(data);
475 } else if channel.starts_with("userOrderUpdates") {
476 self.handle_order_updates(data);
477 }
478 }
479
480 fn handle_fills(&self, data: &serde_json::Value) {
481 let fills = match data.as_array() {
482 Some(a) => a,
483 None => return,
484 };
485
486 for fill_val in fills {
487 let account_str = fill_val.get("user").and_then(|v| v.as_str()).unwrap_or("");
488 let account = match account_str.parse::<WalletAddress>() {
489 Ok(a) => a,
490 Err(_) => continue,
491 };
492 let fill: FillEvent = match serde_json::from_value(fill_val.clone()) {
493 Ok(f) => f,
494 Err(_) => continue,
495 };
496
497 self.apply_fill_to_positions(&account, &fill);
499
500 let _ = self.event_tx.send(HydromancerEvent::Fill { account, fill });
501 }
502 }
503
504 fn handle_order_updates(&self, data: &serde_json::Value) {
505 let updates = match data.as_array() {
506 Some(a) => a,
507 None => return,
508 };
509
510 for update_val in updates {
511 let account_str = update_val
512 .get("user")
513 .and_then(|v| v.as_str())
514 .unwrap_or("");
515 let account = match account_str.parse::<WalletAddress>() {
516 Ok(a) => a,
517 Err(_) => continue,
518 };
519 let update: OrderUpdate = match serde_json::from_value(update_val.clone()) {
520 Ok(u) => u,
521 Err(_) => continue,
522 };
523 let _ = self
524 .event_tx
525 .send(HydromancerEvent::OrderUpdate { account, update });
526 }
527 }
528
529 fn apply_fill_to_positions(&self, account: &WalletAddress, fill: &FillEvent) {
532 let size: f64 = fill.sz.parse().unwrap_or(0.0);
533 let signed_size = if fill.side == "B" { size } else { -size };
534
535 if let Ok(mut positions) = self.positions.try_write() {
539 let state = positions.entry(*account).or_default();
540 if let Some(pos) = state.positions.iter_mut().find(|p| p.coin == fill.coin) {
541 pos.size += signed_size;
542 } else {
543 state.positions.push(PerpPosition {
544 coin: fill.coin.clone(),
545 size: signed_size,
546 entry_price: fill.px.parse().ok(),
547 unrealized_pnl: 0.0,
548 margin_used: 0.0,
549 });
550 }
551 state.last_updated_ms = fill.time;
552 }
553 }
554}
555
556fn parse_portfolio_state(json: &serde_json::Value, now_ms: u64) -> AccountState {
564 let ch = json.get("clearinghouseState").unwrap_or(json);
567
568 let ms = ch.get("marginSummary").unwrap_or(ch);
569
570 let perp_account_value = ms
571 .get("accountValue")
572 .and_then(|v| v.as_str())
573 .and_then(|s| s.parse::<f64>().ok())
574 .unwrap_or(0.0);
575
576 let total_margin_used = ms
577 .get("totalMarginUsed")
578 .and_then(|v| v.as_str())
579 .and_then(|s| s.parse::<f64>().ok())
580 .unwrap_or(0.0);
581
582 let withdrawable = ch
583 .get("withdrawable")
584 .and_then(|v| v.as_str())
585 .and_then(|s| s.parse::<f64>().ok())
586 .unwrap_or(0.0);
587
588 let positions = ch
589 .get("assetPositions")
590 .and_then(|v| v.as_array())
591 .map(|arr| {
592 arr.iter()
593 .filter_map(|ap| {
594 let pos = ap.get("position")?;
595 let coin = pos.get("coin")?.as_str()?.to_string();
596 let size = pos.get("szi")?.as_str()?.parse::<f64>().ok()?;
597 Some(PerpPosition {
598 coin,
599 size,
600 entry_price: pos
601 .get("entryPx")
602 .and_then(|v| v.as_str())
603 .and_then(|s| s.parse().ok()),
604 unrealized_pnl: pos
605 .get("unrealizedPnl")
606 .and_then(|v| v.as_str())
607 .and_then(|s| s.parse().ok())
608 .unwrap_or(0.0),
609 margin_used: pos
610 .get("marginUsed")
611 .and_then(|v| v.as_str())
612 .and_then(|s| s.parse().ok())
613 .unwrap_or(0.0),
614 })
615 })
616 .collect()
617 })
618 .unwrap_or_default();
619
620 let spot_usdc = json
622 .get("spotClearinghouseState")
623 .and_then(|spot| spot.get("balances"))
624 .and_then(|b| b.as_array())
625 .and_then(|balances| {
626 balances.iter().find_map(|entry| {
627 let coin = entry.get("coin")?.as_str()?;
628 if coin == "USDC" {
629 entry
630 .get("total")
631 .and_then(|v| v.as_str())
632 .and_then(|s| s.parse::<f64>().ok())
633 } else {
634 None
635 }
636 })
637 })
638 .unwrap_or(0.0);
639
640 let account_value = perp_account_value + spot_usdc;
641
642 AccountState {
643 account_value,
644 total_margin_used,
645 withdrawable,
646 positions,
647 last_updated_ms: now_ms,
648 }
649}
650
651#[cfg(test)]
652mod tests {
653 use super::*;
654 use std::str::FromStr;
655
656 fn test_account() -> WalletAddress {
657 WalletAddress::from_str("0x72471e6cb5be1b2db2eb00c38b7bca66c8c310c5").unwrap()
658 }
659
660 #[test]
661 fn test_parse_portfolio_state() {
662 let json: serde_json::Value = serde_json::from_str(
663 r#"{
664 "clearinghouseState": {
665 "marginSummary": {
666 "accountValue": "350.0",
667 "totalNtlPos": "3100.0",
668 "totalRawUsd": "350.0",
669 "totalMarginUsed": "77.5"
670 },
671 "withdrawable": "272.5",
672 "assetPositions": [
673 {
674 "position": {
675 "coin": "BTC",
676 "szi": "-0.04",
677 "entryPx": "77610.5",
678 "positionValue": "3100.0",
679 "unrealizedPnl": "-20.0",
680 "marginUsed": "77.5",
681 "returnOnEquity": "-0.05",
682 "liquidationPx": null,
683 "leverage": {"type": "cross", "value": 40}
684 },
685 "type": "oneWay"
686 }
687 ],
688 "time": 1776489498654
689 },
690 "spotClearinghouseState": {
691 "balances": [
692 {"coin": "USDC", "total": "500.0", "hold": "0.0"},
693 {"coin": "HYPE", "total": "10.0", "hold": "0.0"}
694 ]
695 }
696 }"#,
697 )
698 .unwrap();
699
700 let state = parse_portfolio_state(&json, 1000);
701 assert!((state.account_value - 850.0).abs() < 0.01);
703 assert!((state.total_margin_used - 77.5).abs() < 0.01);
704 assert!((state.withdrawable - 272.5).abs() < 0.01);
705 assert_eq!(state.positions.len(), 1);
706 assert_eq!(state.positions[0].coin, "BTC");
707 assert!((state.positions[0].size - (-0.04)).abs() < 0.0001);
708 assert_eq!(state.positions[0].entry_price, Some(77610.5));
709 }
710
711 #[test]
712 fn test_parse_portfolio_state_no_spot() {
713 let json: serde_json::Value = serde_json::from_str(
715 r#"{
716 "clearinghouseState": {
717 "marginSummary": {"accountValue": "100.0", "totalNtlPos": "0.0", "totalRawUsd": "100.0", "totalMarginUsed": "0.0"},
718 "withdrawable": "100.0",
719 "assetPositions": []
720 }
721 }"#,
722 )
723 .unwrap();
724
725 let state = parse_portfolio_state(&json, 0);
726 assert!((state.account_value - 100.0).abs() < 0.01);
727 assert!(state.positions.is_empty());
728 }
729
730 #[test]
731 fn test_parse_portfolio_state_empty() {
732 let json: serde_json::Value = serde_json::from_str(
733 r#"{
734 "clearinghouseState": {
735 "marginSummary": {"accountValue": "0.0", "totalNtlPos": "0.0", "totalRawUsd": "0.0", "totalMarginUsed": "0.0"},
736 "withdrawable": "0.0",
737 "assetPositions": []
738 },
739 "spotClearinghouseState": {
740 "balances": []
741 }
742 }"#,
743 )
744 .unwrap();
745
746 let state = parse_portfolio_state(&json, 0);
747 assert_eq!(state.account_value, 0.0);
748 assert!(state.positions.is_empty());
749 }
750
751 #[test]
752 fn test_parse_portfolio_state_backward_compat() {
753 let json: serde_json::Value = serde_json::from_str(
755 r#"{
756 "marginSummary": {"accountValue": "200.0", "totalNtlPos": "0.0", "totalRawUsd": "200.0", "totalMarginUsed": "50.0"},
757 "withdrawable": "150.0",
758 "assetPositions": []
759 }"#,
760 )
761 .unwrap();
762
763 let state = parse_portfolio_state(&json, 0);
764 assert!((state.account_value - 200.0).abs() < 0.01);
765 assert!((state.total_margin_used - 50.0).abs() < 0.01);
766 }
767
768 #[test]
769 fn test_fill_event_serde() {
770 let json = r#"{"coin":"BTC","side":"B","px":"78734.0","sz":"0.01","time":1776489498654}"#;
771 let fill: FillEvent = serde_json::from_str(json).unwrap();
772 assert_eq!(fill.coin, "BTC");
773 assert_eq!(fill.side, "B");
774 assert_eq!(fill.sz, "0.01");
775 assert_eq!(fill.time, 1776489498654);
776 }
777
778 #[test]
779 fn test_order_update_serde() {
780 let json = r#"{"coin":"BTC","oid":12345,"side":"B","sz":"0.01","limitPx":"80000","status":"filled"}"#;
781 let update: OrderUpdate = serde_json::from_str(json).unwrap();
782 assert_eq!(update.coin, "BTC");
783 assert_eq!(update.oid, Some(12345));
784 assert_eq!(update.status.as_deref(), Some("filled"));
785 }
786
787 #[tokio::test]
788 async fn test_service_add_account() {
789 let svc = HydromancerFeedService::new(
790 "ws://fake".into(),
791 "http://fake".into(),
792 "http://fake-hydromancer".into(),
793 None,
794 vec![],
795 );
796 let account = test_account();
797
798 svc.add_account(account).await;
799 svc.add_account(account).await; let accounts = svc.accounts.read().await;
802 assert_eq!(accounts.len(), 1);
803 }
804
805 #[tokio::test]
806 async fn test_service_get_positions_empty() {
807 let svc = HydromancerFeedService::new(
808 "ws://fake".into(),
809 "http://fake".into(),
810 "http://fake-hydromancer".into(),
811 None,
812 vec![],
813 );
814 let account = test_account();
815 assert!(svc.get_positions(&account).await.is_none());
816 assert_eq!(svc.get_position_size(&account, "BTC").await, 0.0);
817 }
818
819 #[tokio::test]
820 async fn test_service_apply_fill_to_positions() {
821 let svc = HydromancerFeedService::new(
822 "ws://fake".into(),
823 "http://fake".into(),
824 "http://fake-hydromancer".into(),
825 None,
826 vec![],
827 );
828 let account = test_account();
829
830 svc.positions
832 .write()
833 .await
834 .insert(account, AccountState::default());
835
836 let fill = FillEvent {
838 coin: "BTC".into(),
839 side: "B".into(),
840 px: "77000".into(),
841 sz: "0.01".into(),
842 time: 1000,
843 oid: None,
844 fee: None,
845 };
846 svc.apply_fill_to_positions(&account, &fill);
847
848 assert!((svc.get_position_size(&account, "BTC").await - 0.01).abs() < 0.0001);
849
850 let fill2 = FillEvent {
852 coin: "BTC".into(),
853 side: "S".into(),
854 px: "78000".into(),
855 sz: "0.005".into(),
856 time: 2000,
857 oid: None,
858 fee: None,
859 };
860 svc.apply_fill_to_positions(&account, &fill2);
861
862 assert!((svc.get_position_size(&account, "BTC").await - 0.005).abs() < 0.0001);
863 }
864
865 #[tokio::test]
866 async fn test_service_wait_for_fills_timeout() {
867 let svc = HydromancerFeedService::new(
868 "ws://fake".into(),
869 "http://fake".into(),
870 "http://fake-hydromancer".into(),
871 None,
872 vec![],
873 );
874 let account = test_account();
875
876 let fills = svc
877 .wait_for_fills(&account, Duration::from_millis(100))
878 .await;
879 assert!(fills.is_empty());
880 }
881
882 #[tokio::test]
883 async fn test_service_wait_for_fills_receives() {
884 let svc = Arc::new(HydromancerFeedService::new(
885 "ws://fake".into(),
886 "http://fake".into(),
887 "http://fake-hydromancer".into(),
888 None,
889 vec![],
890 ));
891 let account = test_account();
892
893 let svc_clone = svc.clone();
894 let account_clone = account;
895
896 tokio::spawn(async move {
898 tokio::time::sleep(Duration::from_millis(50)).await;
899 let _ = svc_clone.event_tx.send(HydromancerEvent::Fill {
900 account: account_clone,
901 fill: FillEvent {
902 coin: "BTC".into(),
903 side: "B".into(),
904 px: "77000".into(),
905 sz: "0.01".into(),
906 time: 1000,
907 oid: None,
908 fee: None,
909 },
910 });
911 });
912
913 let fills = svc.wait_for_fills(&account, Duration::from_secs(1)).await;
914 assert_eq!(fills.len(), 1);
915 assert_eq!(fills[0].coin, "BTC");
916 assert_eq!(fills[0].sz, "0.01");
917 }
918
919 #[tokio::test]
920 async fn test_service_wait_ignores_other_accounts() {
921 let svc = Arc::new(HydromancerFeedService::new(
922 "ws://fake".into(),
923 "http://fake".into(),
924 "http://fake-hydromancer".into(),
925 None,
926 vec![],
927 ));
928 let account = test_account();
929 let other = WalletAddress::from_str("0x0000000000000000000000000000000000000001").unwrap();
930
931 let svc_clone = svc.clone();
932 tokio::spawn(async move {
933 tokio::time::sleep(Duration::from_millis(50)).await;
934 let _ = svc_clone.event_tx.send(HydromancerEvent::Fill {
935 account: other,
936 fill: FillEvent {
937 coin: "ETH".into(),
938 side: "S".into(),
939 px: "2500".into(),
940 sz: "1.0".into(),
941 time: 1000,
942 oid: None,
943 fee: None,
944 },
945 });
946 });
947
948 let fills = svc
949 .wait_for_fills(&account, Duration::from_millis(200))
950 .await;
951 assert!(fills.is_empty());
952 }
953}