1use anyhow::{Context, Result};
2use futures::FutureExt;
3use std::collections::HashMap;
4use std::panic::AssertUnwindSafe;
5use std::sync::Arc;
6use std::time::Duration;
7use tokio::sync::RwLock;
8use tokio::time;
9use tracing::{debug, error, info, warn};
10
11use crate::hypercore::types::{
12 ClearinghouseRequest, ClearinghouseState, HypercoreAccountState, PerpPosition,
13};
14use crate::messaging::EventBusTrait;
15use crate::portfolio::HypercorePositionUpdate;
16use crate::shared::shutdown::ShutdownRx;
17use hypercall_db::TierReader;
18use hypercall_db_diesel::DatabaseHandler;
19use hypercall_types::EngineMessage;
20
21pub struct HypercorePositionService {
22 client: reqwest::Client,
23 api_url: String,
24 cache: Arc<RwLock<HashMap<String, HypercoreAccountState>>>,
25 update_interval: Duration,
26 event_bus: Arc<dyn EventBusTrait>,
27 db: Arc<DatabaseHandler>,
28}
29
30impl HypercorePositionService {
31 pub fn new(
32 api_url: String,
33 update_interval: Duration,
34 event_bus: Arc<dyn EventBusTrait>,
35 db: Arc<DatabaseHandler>,
36 ) -> Self {
37 Self {
38 client: reqwest::Client::new(),
39 api_url,
40 cache: Arc::new(RwLock::new(HashMap::new())),
41 update_interval,
42 event_bus,
43 db,
44 }
45 }
46
47 pub fn with_default_url(
48 update_interval: Duration,
49 event_bus: Arc<dyn EventBusTrait>,
50 db: Arc<DatabaseHandler>,
51 ) -> Self {
52 let api_url = "https://api.hyperliquid.xyz/info".to_string();
53
54 Self::new(api_url, update_interval, event_bus, db)
55 }
56
57 pub async fn start(&self) -> Result<()> {
64 self.initialize_tracking().await?;
65 self.start_poll_loop().await;
66 Ok(())
67 }
68
69 pub async fn run_with_shutdown(&self, mut shutdown_rx: ShutdownRx) -> Result<()> {
70 self.initialize_tracking().await?;
71
72 let cache = self.cache.clone();
73 let client = self.client.clone();
74 let api_url = self.api_url.clone();
75 let event_bus = self.event_bus.clone();
76 let db = self.db.clone();
77 let mut ticker = time::interval(self.update_interval);
78 const BASE_BACKOFF: Duration = Duration::from_secs(1);
79 const MAX_BACKOFF: Duration = Duration::from_secs(30);
80 let mut consecutive_panics: u32 = 0;
81
82 info!(
83 "HypercorePositionService: Starting shutdown-aware poll loop with {}s interval",
84 self.update_interval.as_secs()
85 );
86
87 loop {
88 tokio::select! {
89 _ = shutdown_rx.recv() => {
90 info!("HypercorePositionService: Received shutdown signal");
91 break;
92 }
93 _ = ticker.tick() => {
94 let poll_result = AssertUnwindSafe(Self::poll_accounts_once(
95 &cache,
96 &client,
97 &api_url,
98 &event_bus,
99 &db,
100 ))
101 .catch_unwind()
102 .await;
103
104 match poll_result {
105 Ok(()) => {
106 consecutive_panics = 0;
107 }
108 Err(panic_info) => {
109 consecutive_panics += 1;
110 let panic_msg = Self::panic_message(&panic_info);
111 error!(
112 "HypercorePositionService: Shutdown-aware poll loop panicked (consecutive: {}): {}",
113 consecutive_panics,
114 panic_msg
115 );
116
117 let backoff = std::cmp::min(
118 BASE_BACKOFF * 2u32.saturating_pow(consecutive_panics - 1),
119 MAX_BACKOFF,
120 );
121 warn!(
122 "HypercorePositionService: Restarting shutdown-aware poll loop after {:?} backoff",
123 backoff
124 );
125
126 tokio::select! {
127 _ = shutdown_rx.recv() => {
128 info!("HypercorePositionService: Received shutdown signal during backoff");
129 return Ok(());
130 }
131 _ = time::sleep(backoff) => {}
132 }
133 }
134 }
135 }
136 }
137 }
138
139 Ok(())
140 }
141
142 async fn initialize_tracking(&self) -> Result<()> {
143 let accounts = Self::load_accounts_from_db(&self.db)?;
144
145 if accounts.is_empty() {
146 info!("HypercorePositionService: No accounts to monitor from user_tiers table");
147 } else {
148 info!(
149 "HypercorePositionService: Starting with {} accounts from user_tiers",
150 accounts.len()
151 );
152 self.fetch_and_emit_snapshots(&accounts).await;
153 }
154
155 Ok(())
156 }
157
158 fn load_accounts_from_db(db: &Arc<DatabaseHandler>) -> Result<Vec<String>> {
159 let tiers = db
160 .get_all_user_tiers_sync()
161 .context("Failed to load user tiers from database")?;
162 Ok(tiers
163 .into_iter()
164 .map(|t| t.wallet_address.to_string())
165 .collect())
166 }
167
168 async fn fetch_and_emit_snapshots(&self, accounts: &[String]) {
169 info!(
170 "HypercorePositionService: Fetching initial snapshots for {} accounts",
171 accounts.len()
172 );
173
174 let futures: Vec<_> = accounts
176 .iter()
177 .map(|addr| self.fetch_account_state(addr.clone()))
178 .collect();
179
180 let results = futures::future::join_all(futures).await;
181
182 let mut cache = self.cache.write().await;
183 let sender = self.event_bus.get_sender();
184 let mut success_count = 0;
185 let mut position_count = 0;
186
187 for (account, state) in results.into_iter().flatten() {
188 for position in &state.perp_positions {
190 let Some(update) =
191 Self::build_position_update(&account, position, state.last_updated, true)
192 else {
193 continue;
194 };
195
196 if let Err(e) = sender.send(EngineMessage::HypercorePositionUpdate(update)) {
197 error!(
198 "HypercorePositionService: Failed to emit snapshot for {}/{}: {}",
199 account, position.coin, e
200 );
201 } else {
202 position_count += 1;
203 }
204 }
205
206 cache.insert(account, state);
207 success_count += 1;
208 }
209
210 info!(
211 "HypercorePositionService: Initial snapshot complete - {} accounts, {} positions emitted",
212 success_count, position_count
213 );
214 }
215
216 async fn start_poll_loop(&self) {
217 let cache = self.cache.clone();
218 let client = self.client.clone();
219 let api_url = self.api_url.clone();
220 let interval = self.update_interval;
221 let event_bus = self.event_bus.clone();
222 let db = self.db.clone();
223
224 info!(
225 "HypercorePositionService: Starting poll loop with {}s interval (accounts refreshed from DB each iteration)",
226 interval.as_secs()
227 );
228
229 tokio::spawn(async move {
230 const BASE_BACKOFF: Duration = Duration::from_secs(1);
231 const MAX_BACKOFF: Duration = Duration::from_secs(30);
232 let mut consecutive_panics: u32 = 0;
233
234 loop {
235 let poll_result = AssertUnwindSafe(Self::run_poll_iteration(
237 &cache, &client, &api_url, interval, &event_bus, &db,
238 ))
239 .catch_unwind()
240 .await;
241
242 match poll_result {
243 Ok(()) => {
244 consecutive_panics = 0;
247 warn!(
248 "HypercorePositionService: Poll iteration exited unexpectedly, restarting"
249 );
250 }
251 Err(panic_info) => {
252 consecutive_panics += 1;
253
254 let panic_msg = if let Some(s) = panic_info.downcast_ref::<&str>() {
256 s.to_string()
257 } else if let Some(s) = panic_info.downcast_ref::<String>() {
258 s.clone()
259 } else {
260 "Unknown panic".to_string()
261 };
262
263 error!(
264 "HypercorePositionService: Poll loop panicked (consecutive: {}): {}",
265 consecutive_panics, panic_msg
266 );
267
268 let backoff = std::cmp::min(
270 BASE_BACKOFF * 2u32.saturating_pow(consecutive_panics - 1),
271 MAX_BACKOFF,
272 );
273
274 warn!(
275 "HypercorePositionService: Restarting poll loop after {:?} backoff",
276 backoff
277 );
278
279 time::sleep(backoff).await;
280 }
281 }
282 }
283 });
284 }
285
286 async fn run_poll_iteration(
290 cache: &Arc<RwLock<HashMap<String, HypercoreAccountState>>>,
291 client: &reqwest::Client,
292 api_url: &str,
293 interval: Duration,
294 event_bus: &Arc<dyn EventBusTrait>,
295 db: &Arc<DatabaseHandler>,
296 ) {
297 let mut ticker = time::interval(interval);
298 loop {
299 ticker.tick().await;
300
301 let accounts = match Self::load_accounts_from_db(db) {
303 Ok(accounts) => accounts,
304 Err(e) => {
305 error!(
306 "HypercorePositionService: Failed to load accounts from DB: {}",
307 e
308 );
309 continue;
310 }
311 };
312
313 if accounts.is_empty() {
314 debug!("HypercorePositionService: No accounts to poll");
315 continue;
316 }
317
318 debug!(
319 "HypercorePositionService: Polling {} accounts",
320 accounts.len()
321 );
322
323 let new_accounts: Vec<String> = {
325 let cache_read = cache.read().await;
326 accounts
327 .iter()
328 .filter(|a| !cache_read.contains_key(*a))
329 .cloned()
330 .collect()
331 };
332
333 if !new_accounts.is_empty() {
334 info!(
335 "HypercorePositionService: Detected {} new accounts, will emit snapshots",
336 new_accounts.len()
337 );
338 }
339
340 let futures: Vec<_> = accounts
342 .iter()
343 .map(|account| async {
344 let result = Self::fetch_clearinghouse_state(client, api_url, account).await;
345 (account.clone(), result)
346 })
347 .collect();
348
349 let results = futures::future::join_all(futures).await;
350
351 let mut cache_write = cache.write().await;
353 let sender = event_bus.get_sender();
354
355 for (account, result) in results {
356 match result {
357 Ok(new_state) => {
358 let is_new_account = !cache_write.contains_key(&account);
359
360 if is_new_account {
361 for position in &new_state.perp_positions {
362 let Some(update) = Self::build_position_update(
363 &account,
364 position,
365 new_state.last_updated,
366 true,
367 ) else {
368 continue;
369 };
370
371 debug!(
372 "HypercorePositionService: Emitting snapshot for new account {}/{}: size={}, entry_price={}",
373 update.account, update.coin, update.size, update.entry_price
374 );
375
376 if let Err(e) =
377 sender.send(EngineMessage::HypercorePositionUpdate(update))
378 {
379 error!(
380 "HypercorePositionService: Failed to emit snapshot: {}",
381 e
382 );
383 }
384 }
385 } else {
386 let old_state = cache_write.get(&account);
387 let updates =
388 Self::compute_position_diffs(&account, old_state, &new_state);
389
390 for update in updates {
391 debug!(
392 "HypercorePositionService: Emitting diff for {}/{}: size={}, entry_price={}",
393 update.account, update.coin, update.size, update.entry_price
394 );
395 if let Err(e) =
396 sender.send(EngineMessage::HypercorePositionUpdate(update))
397 {
398 error!("HypercorePositionService: Failed to emit diff: {}", e);
399 }
400 }
401 }
402
403 cache_write.insert(account, new_state);
404 }
405 Err(e) => {
406 error!(
407 "HypercorePositionService: Failed to fetch state for {}: {}",
408 account, e
409 );
410 }
411 }
412 }
413 }
414 }
415
416 fn compute_position_diffs(
417 account: &str,
418 old_state: Option<&HypercoreAccountState>,
419 new_state: &HypercoreAccountState,
420 ) -> Vec<HypercorePositionUpdate> {
421 let mut updates = Vec::new();
422
423 let old_positions: HashMap<String, &PerpPosition> = old_state
425 .map(|s| {
426 s.perp_positions
427 .iter()
428 .map(|p| (p.coin.clone(), p))
429 .collect()
430 })
431 .unwrap_or_default();
432
433 for new_pos in &new_state.perp_positions {
434 let should_emit = match old_positions.get(&new_pos.coin) {
435 Some(old_pos) => {
436 let size_changed = (old_pos.size - new_pos.size).abs() > f64::EPSILON;
437 let entry_price_changed = old_pos.entry_price != new_pos.entry_price;
438 let unrealized_pnl_changed =
439 (old_pos.unrealized_pnl - new_pos.unrealized_pnl).abs() > f64::EPSILON;
440 size_changed || entry_price_changed || unrealized_pnl_changed
441 }
442 None => true,
443 };
444
445 if should_emit {
446 let Some(update) =
447 Self::build_position_update(account, new_pos, new_state.last_updated, false)
448 else {
449 continue;
450 };
451 updates.push(update);
452 }
453 }
454
455 for (coin, _) in old_positions {
457 if !new_state.perp_positions.iter().any(|p| p.coin == coin) {
458 updates.push(HypercorePositionUpdate {
459 account: account.to_string(),
460 coin,
461 size: 0.0,
462 entry_price: 0.0,
463 unrealized_pnl: 0.0,
464 timestamp: new_state.last_updated,
465 snapshot: false,
466 });
467 }
468 }
469
470 updates
471 }
472
473 async fn poll_accounts_once(
474 cache: &Arc<RwLock<HashMap<String, HypercoreAccountState>>>,
475 client: &reqwest::Client,
476 api_url: &str,
477 event_bus: &Arc<dyn EventBusTrait>,
478 db: &Arc<DatabaseHandler>,
479 ) {
480 let accounts = match Self::load_accounts_from_db(db) {
481 Ok(accounts) => accounts,
482 Err(e) => {
483 error!(
484 "HypercorePositionService: Failed to load accounts from DB: {}",
485 e
486 );
487 return;
488 }
489 };
490
491 if accounts.is_empty() {
492 debug!("HypercorePositionService: No accounts to poll");
493 return;
494 }
495
496 debug!(
497 "HypercorePositionService: Polling {} accounts",
498 accounts.len()
499 );
500
501 let new_accounts: Vec<String> = {
502 let cache_read = cache.read().await;
503 accounts
504 .iter()
505 .filter(|a| !cache_read.contains_key(*a))
506 .cloned()
507 .collect()
508 };
509
510 if !new_accounts.is_empty() {
511 info!(
512 "HypercorePositionService: Detected {} new accounts, will emit snapshots",
513 new_accounts.len()
514 );
515 }
516
517 let futures: Vec<_> = accounts
518 .iter()
519 .map(|account| async {
520 let result = Self::fetch_clearinghouse_state(client, api_url, account).await;
521 (account.clone(), result)
522 })
523 .collect();
524
525 let results = futures::future::join_all(futures).await;
526 let mut cache_write = cache.write().await;
527 let sender = event_bus.get_sender();
528
529 for (account, result) in results {
530 match result {
531 Ok(new_state) => {
532 let is_new_account = !cache_write.contains_key(&account);
533
534 if is_new_account {
535 for position in &new_state.perp_positions {
536 let Some(update) = Self::build_position_update(
537 &account,
538 position,
539 new_state.last_updated,
540 true,
541 ) else {
542 continue;
543 };
544
545 debug!(
546 "HypercorePositionService: Emitting snapshot for new account {}/{}: size={}, entry_price={}",
547 update.account, update.coin, update.size, update.entry_price
548 );
549
550 if let Err(e) =
551 sender.send(EngineMessage::HypercorePositionUpdate(update))
552 {
553 error!("HypercorePositionService: Failed to emit snapshot: {}", e);
554 }
555 }
556 } else {
557 let old_state = cache_write.get(&account);
558 let updates = Self::compute_position_diffs(&account, old_state, &new_state);
559
560 for update in updates {
561 debug!(
562 "HypercorePositionService: Emitting diff for {}/{}: size={}, entry_price={}",
563 update.account, update.coin, update.size, update.entry_price
564 );
565 if let Err(e) =
566 sender.send(EngineMessage::HypercorePositionUpdate(update))
567 {
568 error!("HypercorePositionService: Failed to emit diff: {}", e);
569 }
570 }
571 }
572
573 cache_write.insert(account, new_state);
574 }
575 Err(e) => {
576 error!(
577 "HypercorePositionService: Failed to fetch state for {}: {}",
578 account, e
579 );
580 }
581 }
582 }
583 }
584
585 fn build_position_update(
586 account: &str,
587 position: &PerpPosition,
588 timestamp: u64,
589 snapshot: bool,
590 ) -> Option<HypercorePositionUpdate> {
591 let Some(entry_price) = position.entry_price else {
592 warn!(
593 "HypercorePositionService: Skipping {} update for {}/{} with missing entry_px",
594 if snapshot { "snapshot" } else { "diff" },
595 account,
596 position.coin
597 );
598 return None;
599 };
600
601 Some(HypercorePositionUpdate {
602 account: account.to_string(),
603 coin: position.coin.clone(),
604 size: position.size,
605 entry_price,
606 unrealized_pnl: position.unrealized_pnl,
607 timestamp,
608 snapshot,
609 })
610 }
611
612 fn panic_message(panic_info: &Box<dyn std::any::Any + Send>) -> String {
613 if let Some(s) = panic_info.downcast_ref::<&str>() {
614 s.to_string()
615 } else if let Some(s) = panic_info.downcast_ref::<String>() {
616 s.clone()
617 } else {
618 "Unknown panic".to_string()
619 }
620 }
621
622 async fn fetch_account_state(
623 &self,
624 address: String,
625 ) -> Option<(String, HypercoreAccountState)> {
626 match Self::fetch_clearinghouse_state(&self.client, &self.api_url, &address).await {
627 Ok(state) => {
628 info!(
629 "HypercorePositionService: Fetched state for {}: account_value={}, positions={}",
630 address,
631 state.account_value,
632 state.perp_positions.len()
633 );
634 Some((address, state))
635 }
636 Err(e) => {
637 error!(
638 "HypercorePositionService: Failed to fetch state for {}: {}",
639 address, e
640 );
641 None
642 }
643 }
644 }
645
646 async fn fetch_clearinghouse_state(
647 client: &reqwest::Client,
648 api_url: &str,
649 address: &str,
650 ) -> Result<HypercoreAccountState> {
651 debug!(
652 "HypercorePositionService: Sending request to {} for address {}",
653 api_url, address
654 );
655
656 let request = ClearinghouseRequest {
657 request_type: "clearinghouseState".to_string(),
658 user: address.to_string(),
659 };
660
661 let response = client
662 .post(api_url)
663 .json(&request)
664 .timeout(Duration::from_secs(10))
665 .send()
666 .await
667 .context("Failed to send request to Hyperliquid API")?;
668
669 let status = response.status();
670 debug!(
671 "HypercorePositionService: Received response with status {} for address {}",
672 status, address
673 );
674
675 if !status.is_success() {
676 let error_text = response
677 .text()
678 .await
679 .unwrap_or_else(|_| "unknown error".to_string());
680 anyhow::bail!("API request failed with status {}: {}", status, error_text);
681 }
682
683 let clearinghouse_state: ClearinghouseState = response
684 .json()
685 .await
686 .context("Failed to parse clearinghouse state response")?;
687
688 debug!(
689 "HypercorePositionService: Successfully parsed clearinghouse state for {}",
690 address
691 );
692 Self::convert_to_hypercore_state(clearinghouse_state)
693 }
694
695 fn parse_float(s: &str, field_name: &str) -> Result<f64> {
696 s.parse::<f64>().map_err(|e| {
697 warn!(
698 "HypercorePositionService: Failed to parse float for field '{}': value='{}', error={}",
699 field_name, s, e
700 );
701 anyhow::anyhow!(
702 "Invalid float value for field '{}': '{}' ({})",
703 field_name,
704 s,
705 e
706 )
707 })
708 }
709
710 fn parse_optional_float(s: &str, field_name: &str) -> Result<Option<f64>> {
711 if s.is_empty() {
712 return Ok(None);
713 }
714 Self::parse_float(s, field_name).map(Some)
715 }
716
717 fn convert_to_hypercore_state(state: ClearinghouseState) -> Result<HypercoreAccountState> {
718 let account_value = Self::parse_float(&state.margin_summary.account_value, "account_value")
719 .context("Failed to parse margin_summary.account_value")?;
720 let total_margin_used =
721 Self::parse_float(&state.margin_summary.total_margin_used, "total_margin_used")
722 .context("Failed to parse margin_summary.total_margin_used")?;
723 let withdrawable = Self::parse_float(&state.withdrawable, "withdrawable")
724 .context("Failed to parse withdrawable")?;
725
726 let mut perp_positions: Vec<PerpPosition> = Vec::with_capacity(state.asset_positions.len());
727
728 for asset in state.asset_positions {
729 let coin = asset.position.coin;
730 let size = Self::parse_float(&asset.position.szi, &format!("{}.szi", coin))
731 .with_context(|| format!("Failed to parse position size for {}", coin))?;
732 let entry_price = match &asset.position.entry_px {
733 Some(p) => Self::parse_optional_float(p, &format!("{}.entry_px", coin))
734 .with_context(|| format!("Failed to parse entry price for {}", coin))?,
735 None => None,
736 };
737 let position_value = Self::parse_float(
738 &asset.position.position_value,
739 &format!("{}.position_value", coin),
740 )
741 .with_context(|| format!("Failed to parse position value for {}", coin))?;
742 let unrealized_pnl = Self::parse_float(
743 &asset.position.unrealized_pnl,
744 &format!("{}.unrealized_pnl", coin),
745 )
746 .with_context(|| format!("Failed to parse unrealized PnL for {}", coin))?;
747 let margin_used = Self::parse_float(
748 &asset.position.margin_used,
749 &format!("{}.margin_used", coin),
750 )
751 .with_context(|| format!("Failed to parse margin used for {}", coin))?;
752 let liquidation_price = match &asset.position.liquidation_px {
753 Some(p) => Self::parse_optional_float(p, &format!("{}.liquidation_px", coin))
754 .with_context(|| format!("Failed to parse liquidation price for {}", coin))?,
755 None => None,
756 };
757
758 perp_positions.push(PerpPosition {
759 coin,
760 size,
761 entry_price,
762 position_value,
763 unrealized_pnl,
764 margin_used,
765 liquidation_price,
766 });
767 }
768
769 Ok(HypercoreAccountState {
770 account_value,
771 total_margin_used,
772 withdrawable,
773 perp_positions,
774 last_updated: state.time,
775 })
776 }
777
778 pub async fn get_cached_state(&self, address: &str) -> Option<HypercoreAccountState> {
780 let cache = self.cache.read().await;
781 cache.get(address).cloned()
782 }
783}
784
785#[cfg(test)]
786mod tests {
787 use super::*;
788
789 #[test]
790 fn test_compute_position_diffs_new_position() {
791 let old_state = HypercoreAccountState {
792 account_value: 1000.0,
793 total_margin_used: 100.0,
794 withdrawable: 900.0,
795 perp_positions: vec![],
796 last_updated: 100,
797 };
798
799 let new_state = HypercoreAccountState {
800 account_value: 1000.0,
801 total_margin_used: 100.0,
802 withdrawable: 900.0,
803 perp_positions: vec![PerpPosition {
804 coin: "BTC".to_string(),
805 size: 1.0,
806 entry_price: Some(50000.0),
807 position_value: 50000.0,
808 unrealized_pnl: 0.0,
809 margin_used: 100.0,
810 liquidation_price: Some(45000.0),
811 }],
812 last_updated: 200,
813 };
814
815 let updates =
816 HypercorePositionService::compute_position_diffs("0x123", Some(&old_state), &new_state);
817
818 assert_eq!(updates.len(), 1);
819 assert_eq!(updates[0].coin, "BTC");
820 assert_eq!(updates[0].size, 1.0);
821 assert!(!updates[0].snapshot);
822 }
823
824 #[test]
825 fn test_compute_position_diffs_closed_position() {
826 let old_state = HypercoreAccountState {
827 account_value: 1000.0,
828 total_margin_used: 100.0,
829 withdrawable: 900.0,
830 perp_positions: vec![PerpPosition {
831 coin: "BTC".to_string(),
832 size: 1.0,
833 entry_price: Some(50000.0),
834 position_value: 50000.0,
835 unrealized_pnl: 0.0,
836 margin_used: 100.0,
837 liquidation_price: Some(45000.0),
838 }],
839 last_updated: 100,
840 };
841
842 let new_state = HypercoreAccountState {
843 account_value: 1000.0,
844 total_margin_used: 0.0,
845 withdrawable: 1000.0,
846 perp_positions: vec![],
847 last_updated: 200,
848 };
849
850 let updates =
851 HypercorePositionService::compute_position_diffs("0x123", Some(&old_state), &new_state);
852
853 assert_eq!(updates.len(), 1);
854 assert_eq!(updates[0].coin, "BTC");
855 assert_eq!(updates[0].size, 0.0);
856 assert!(!updates[0].snapshot);
857 }
858
859 #[test]
860 fn test_compute_position_diffs_size_change() {
861 let old_state = HypercoreAccountState {
862 account_value: 1000.0,
863 total_margin_used: 100.0,
864 withdrawable: 900.0,
865 perp_positions: vec![PerpPosition {
866 coin: "BTC".to_string(),
867 size: 1.0,
868 entry_price: Some(50000.0),
869 position_value: 50000.0,
870 unrealized_pnl: 0.0,
871 margin_used: 100.0,
872 liquidation_price: Some(45000.0),
873 }],
874 last_updated: 100,
875 };
876
877 let new_state = HypercoreAccountState {
878 account_value: 1000.0,
879 total_margin_used: 200.0,
880 withdrawable: 800.0,
881 perp_positions: vec![PerpPosition {
882 coin: "BTC".to_string(),
883 size: 2.0, entry_price: Some(50000.0),
885 position_value: 100000.0,
886 unrealized_pnl: 0.0,
887 margin_used: 200.0,
888 liquidation_price: Some(45000.0),
889 }],
890 last_updated: 200,
891 };
892
893 let updates =
894 HypercorePositionService::compute_position_diffs("0x123", Some(&old_state), &new_state);
895
896 assert_eq!(updates.len(), 1);
897 assert_eq!(updates[0].coin, "BTC");
898 assert_eq!(updates[0].size, 2.0);
899 assert!(!updates[0].snapshot);
900 }
901
902 #[test]
903 fn test_compute_position_diffs_no_change() {
904 let old_state = HypercoreAccountState {
905 account_value: 1000.0,
906 total_margin_used: 100.0,
907 withdrawable: 900.0,
908 perp_positions: vec![PerpPosition {
909 coin: "BTC".to_string(),
910 size: 1.0,
911 entry_price: Some(50000.0),
912 position_value: 50000.0,
913 unrealized_pnl: 100.0, margin_used: 100.0,
915 liquidation_price: Some(45000.0),
916 }],
917 last_updated: 100,
918 };
919
920 let new_state = HypercoreAccountState {
921 account_value: 1100.0,
922 total_margin_used: 100.0,
923 withdrawable: 1000.0,
924 perp_positions: vec![PerpPosition {
925 coin: "BTC".to_string(),
926 size: 1.0, entry_price: Some(50000.0), position_value: 51000.0,
929 unrealized_pnl: 1000.0, margin_used: 100.0,
931 liquidation_price: Some(45000.0),
932 }],
933 last_updated: 200,
934 };
935
936 let updates =
937 HypercorePositionService::compute_position_diffs("0x123", Some(&old_state), &new_state);
938
939 assert_eq!(updates.len(), 1, "unrealized PnL change should emit update");
940 }
941}