Skip to main content

hypercall/hypercore/
hypercore_position_service.rs

1use anyhow::{Context, Result};
2use futures::FutureExt;
3use std::collections::HashMap;
4use std::panic::AssertUnwindSafe;
5use std::sync::Arc;
6use std::time::Duration;
7use tokio::sync::RwLock;
8use tokio::time;
9use tracing::{debug, error, info, warn};
10
11use crate::hypercore::types::{
12    ClearinghouseRequest, ClearinghouseState, HypercoreAccountState, PerpPosition,
13};
14use crate::messaging::EventBusTrait;
15use crate::portfolio::HypercorePositionUpdate;
16use crate::shared::shutdown::ShutdownRx;
17use hypercall_db::TierReader;
18use hypercall_db_diesel::DatabaseHandler;
19use hypercall_types::EngineMessage;
20
21pub struct HypercorePositionService {
22    client: reqwest::Client,
23    api_url: String,
24    cache: Arc<RwLock<HashMap<String, HypercoreAccountState>>>,
25    update_interval: Duration,
26    event_bus: Arc<dyn EventBusTrait>,
27    db: Arc<DatabaseHandler>,
28}
29
30impl HypercorePositionService {
31    pub fn new(
32        api_url: String,
33        update_interval: Duration,
34        event_bus: Arc<dyn EventBusTrait>,
35        db: Arc<DatabaseHandler>,
36    ) -> Self {
37        Self {
38            client: reqwest::Client::new(),
39            api_url,
40            cache: Arc::new(RwLock::new(HashMap::new())),
41            update_interval,
42            event_bus,
43            db,
44        }
45    }
46
47    pub fn with_default_url(
48        update_interval: Duration,
49        event_bus: Arc<dyn EventBusTrait>,
50        db: Arc<DatabaseHandler>,
51    ) -> Self {
52        let api_url = "https://api.hyperliquid.xyz/info".to_string();
53
54        Self::new(api_url, update_interval, event_bus, db)
55    }
56
57    /// Start the service - fetches initial state and begins polling.
58    ///
59    /// This method:
60    /// 1. Queries user_tiers table for all accounts to monitor
61    /// 2. Fetches initial state for all accounts and emits snapshot updates
62    /// 3. Starts polling loop that refreshes accounts from DB before each poll
63    pub async fn start(&self) -> Result<()> {
64        self.initialize_tracking().await?;
65        self.start_poll_loop().await;
66        Ok(())
67    }
68
69    pub async fn run_with_shutdown(&self, mut shutdown_rx: ShutdownRx) -> Result<()> {
70        self.initialize_tracking().await?;
71
72        let cache = self.cache.clone();
73        let client = self.client.clone();
74        let api_url = self.api_url.clone();
75        let event_bus = self.event_bus.clone();
76        let db = self.db.clone();
77        let mut ticker = time::interval(self.update_interval);
78        const BASE_BACKOFF: Duration = Duration::from_secs(1);
79        const MAX_BACKOFF: Duration = Duration::from_secs(30);
80        let mut consecutive_panics: u32 = 0;
81
82        info!(
83            "HypercorePositionService: Starting shutdown-aware poll loop with {}s interval",
84            self.update_interval.as_secs()
85        );
86
87        loop {
88            tokio::select! {
89                _ = shutdown_rx.recv() => {
90                    info!("HypercorePositionService: Received shutdown signal");
91                    break;
92                }
93                _ = ticker.tick() => {
94                    let poll_result = AssertUnwindSafe(Self::poll_accounts_once(
95                        &cache,
96                        &client,
97                        &api_url,
98                        &event_bus,
99                        &db,
100                    ))
101                    .catch_unwind()
102                    .await;
103
104                    match poll_result {
105                        Ok(()) => {
106                            consecutive_panics = 0;
107                        }
108                        Err(panic_info) => {
109                            consecutive_panics += 1;
110                            let panic_msg = Self::panic_message(&panic_info);
111                            error!(
112                                "HypercorePositionService: Shutdown-aware poll loop panicked (consecutive: {}): {}",
113                                consecutive_panics,
114                                panic_msg
115                            );
116
117                            let backoff = std::cmp::min(
118                                BASE_BACKOFF * 2u32.saturating_pow(consecutive_panics - 1),
119                                MAX_BACKOFF,
120                            );
121                            warn!(
122                                "HypercorePositionService: Restarting shutdown-aware poll loop after {:?} backoff",
123                                backoff
124                            );
125
126                            tokio::select! {
127                                _ = shutdown_rx.recv() => {
128                                    info!("HypercorePositionService: Received shutdown signal during backoff");
129                                    return Ok(());
130                                }
131                                _ = time::sleep(backoff) => {}
132                            }
133                        }
134                    }
135                }
136            }
137        }
138
139        Ok(())
140    }
141
142    async fn initialize_tracking(&self) -> Result<()> {
143        let accounts = Self::load_accounts_from_db(&self.db)?;
144
145        if accounts.is_empty() {
146            info!("HypercorePositionService: No accounts to monitor from user_tiers table");
147        } else {
148            info!(
149                "HypercorePositionService: Starting with {} accounts from user_tiers",
150                accounts.len()
151            );
152            self.fetch_and_emit_snapshots(&accounts).await;
153        }
154
155        Ok(())
156    }
157
158    fn load_accounts_from_db(db: &Arc<DatabaseHandler>) -> Result<Vec<String>> {
159        let tiers = db
160            .get_all_user_tiers_sync()
161            .context("Failed to load user tiers from database")?;
162        Ok(tiers
163            .into_iter()
164            .map(|t| t.wallet_address.to_string())
165            .collect())
166    }
167
168    async fn fetch_and_emit_snapshots(&self, accounts: &[String]) {
169        info!(
170            "HypercorePositionService: Fetching initial snapshots for {} accounts",
171            accounts.len()
172        );
173
174        // Fetch all accounts in parallel
175        let futures: Vec<_> = accounts
176            .iter()
177            .map(|addr| self.fetch_account_state(addr.clone()))
178            .collect();
179
180        let results = futures::future::join_all(futures).await;
181
182        let mut cache = self.cache.write().await;
183        let sender = self.event_bus.get_sender();
184        let mut success_count = 0;
185        let mut position_count = 0;
186
187        for (account, state) in results.into_iter().flatten() {
188            // Emit snapshot for each position
189            for position in &state.perp_positions {
190                let Some(update) =
191                    Self::build_position_update(&account, position, state.last_updated, true)
192                else {
193                    continue;
194                };
195
196                if let Err(e) = sender.send(EngineMessage::HypercorePositionUpdate(update)) {
197                    error!(
198                        "HypercorePositionService: Failed to emit snapshot for {}/{}: {}",
199                        account, position.coin, e
200                    );
201                } else {
202                    position_count += 1;
203                }
204            }
205
206            cache.insert(account, state);
207            success_count += 1;
208        }
209
210        info!(
211            "HypercorePositionService: Initial snapshot complete - {} accounts, {} positions emitted",
212            success_count, position_count
213        );
214    }
215
216    async fn start_poll_loop(&self) {
217        let cache = self.cache.clone();
218        let client = self.client.clone();
219        let api_url = self.api_url.clone();
220        let interval = self.update_interval;
221        let event_bus = self.event_bus.clone();
222        let db = self.db.clone();
223
224        info!(
225            "HypercorePositionService: Starting poll loop with {}s interval (accounts refreshed from DB each iteration)",
226            interval.as_secs()
227        );
228
229        tokio::spawn(async move {
230            const BASE_BACKOFF: Duration = Duration::from_secs(1);
231            const MAX_BACKOFF: Duration = Duration::from_secs(30);
232            let mut consecutive_panics: u32 = 0;
233
234            loop {
235                // Wrap the entire poll loop in catch_unwind for panic recovery
236                let poll_result = AssertUnwindSafe(Self::run_poll_iteration(
237                    &cache, &client, &api_url, interval, &event_bus, &db,
238                ))
239                .catch_unwind()
240                .await;
241
242                match poll_result {
243                    Ok(()) => {
244                        // Iteration completed normally - this shouldn't happen since
245                        // run_poll_iteration runs forever, but handle it gracefully
246                        consecutive_panics = 0;
247                        warn!(
248                            "HypercorePositionService: Poll iteration exited unexpectedly, restarting"
249                        );
250                    }
251                    Err(panic_info) => {
252                        consecutive_panics += 1;
253
254                        // Extract panic message for logging
255                        let panic_msg = if let Some(s) = panic_info.downcast_ref::<&str>() {
256                            s.to_string()
257                        } else if let Some(s) = panic_info.downcast_ref::<String>() {
258                            s.clone()
259                        } else {
260                            "Unknown panic".to_string()
261                        };
262
263                        error!(
264                            "HypercorePositionService: Poll loop panicked (consecutive: {}): {}",
265                            consecutive_panics, panic_msg
266                        );
267
268                        // Exponential backoff with cap
269                        let backoff = std::cmp::min(
270                            BASE_BACKOFF * 2u32.saturating_pow(consecutive_panics - 1),
271                            MAX_BACKOFF,
272                        );
273
274                        warn!(
275                            "HypercorePositionService: Restarting poll loop after {:?} backoff",
276                            backoff
277                        );
278
279                        time::sleep(backoff).await;
280                    }
281                }
282            }
283        });
284    }
285
286    /// Inner poll loop that runs until a panic occurs.
287    /// Separated from the supervisor to enable catch_unwind.
288    /// Refreshes accounts from DB before each poll iteration.
289    async fn run_poll_iteration(
290        cache: &Arc<RwLock<HashMap<String, HypercoreAccountState>>>,
291        client: &reqwest::Client,
292        api_url: &str,
293        interval: Duration,
294        event_bus: &Arc<dyn EventBusTrait>,
295        db: &Arc<DatabaseHandler>,
296    ) {
297        let mut ticker = time::interval(interval);
298        loop {
299            ticker.tick().await;
300
301            // Load current accounts from DB before each poll
302            let accounts = match Self::load_accounts_from_db(db) {
303                Ok(accounts) => accounts,
304                Err(e) => {
305                    error!(
306                        "HypercorePositionService: Failed to load accounts from DB: {}",
307                        e
308                    );
309                    continue;
310                }
311            };
312
313            if accounts.is_empty() {
314                debug!("HypercorePositionService: No accounts to poll");
315                continue;
316            }
317
318            debug!(
319                "HypercorePositionService: Polling {} accounts",
320                accounts.len()
321            );
322
323            // Check for new accounts (not in cache) to emit snapshots
324            let new_accounts: Vec<String> = {
325                let cache_read = cache.read().await;
326                accounts
327                    .iter()
328                    .filter(|a| !cache_read.contains_key(*a))
329                    .cloned()
330                    .collect()
331            };
332
333            if !new_accounts.is_empty() {
334                info!(
335                    "HypercorePositionService: Detected {} new accounts, will emit snapshots",
336                    new_accounts.len()
337                );
338            }
339
340            // Fetch all accounts in parallel
341            let futures: Vec<_> = accounts
342                .iter()
343                .map(|account| async {
344                    let result = Self::fetch_clearinghouse_state(client, api_url, account).await;
345                    (account.clone(), result)
346                })
347                .collect();
348
349            let results = futures::future::join_all(futures).await;
350
351            // Process results and update cache
352            let mut cache_write = cache.write().await;
353            let sender = event_bus.get_sender();
354
355            for (account, result) in results {
356                match result {
357                    Ok(new_state) => {
358                        let is_new_account = !cache_write.contains_key(&account);
359
360                        if is_new_account {
361                            for position in &new_state.perp_positions {
362                                let Some(update) = Self::build_position_update(
363                                    &account,
364                                    position,
365                                    new_state.last_updated,
366                                    true,
367                                ) else {
368                                    continue;
369                                };
370
371                                debug!(
372                                    "HypercorePositionService: Emitting snapshot for new account {}/{}: size={}, entry_price={}",
373                                    update.account, update.coin, update.size, update.entry_price
374                                );
375
376                                if let Err(e) =
377                                    sender.send(EngineMessage::HypercorePositionUpdate(update))
378                                {
379                                    error!(
380                                        "HypercorePositionService: Failed to emit snapshot: {}",
381                                        e
382                                    );
383                                }
384                            }
385                        } else {
386                            let old_state = cache_write.get(&account);
387                            let updates =
388                                Self::compute_position_diffs(&account, old_state, &new_state);
389
390                            for update in updates {
391                                debug!(
392                                    "HypercorePositionService: Emitting diff for {}/{}: size={}, entry_price={}",
393                                    update.account, update.coin, update.size, update.entry_price
394                                );
395                                if let Err(e) =
396                                    sender.send(EngineMessage::HypercorePositionUpdate(update))
397                                {
398                                    error!("HypercorePositionService: Failed to emit diff: {}", e);
399                                }
400                            }
401                        }
402
403                        cache_write.insert(account, new_state);
404                    }
405                    Err(e) => {
406                        error!(
407                            "HypercorePositionService: Failed to fetch state for {}: {}",
408                            account, e
409                        );
410                    }
411                }
412            }
413        }
414    }
415
416    fn compute_position_diffs(
417        account: &str,
418        old_state: Option<&HypercoreAccountState>,
419        new_state: &HypercoreAccountState,
420    ) -> Vec<HypercorePositionUpdate> {
421        let mut updates = Vec::new();
422
423        // Build map of old positions
424        let old_positions: HashMap<String, &PerpPosition> = old_state
425            .map(|s| {
426                s.perp_positions
427                    .iter()
428                    .map(|p| (p.coin.clone(), p))
429                    .collect()
430            })
431            .unwrap_or_default();
432
433        for new_pos in &new_state.perp_positions {
434            let should_emit = match old_positions.get(&new_pos.coin) {
435                Some(old_pos) => {
436                    let size_changed = (old_pos.size - new_pos.size).abs() > f64::EPSILON;
437                    let entry_price_changed = old_pos.entry_price != new_pos.entry_price;
438                    let unrealized_pnl_changed =
439                        (old_pos.unrealized_pnl - new_pos.unrealized_pnl).abs() > f64::EPSILON;
440                    size_changed || entry_price_changed || unrealized_pnl_changed
441                }
442                None => true,
443            };
444
445            if should_emit {
446                let Some(update) =
447                    Self::build_position_update(account, new_pos, new_state.last_updated, false)
448                else {
449                    continue;
450                };
451                updates.push(update);
452            }
453        }
454
455        // Check for closed positions (existed before but not now)
456        for (coin, _) in old_positions {
457            if !new_state.perp_positions.iter().any(|p| p.coin == coin) {
458                updates.push(HypercorePositionUpdate {
459                    account: account.to_string(),
460                    coin,
461                    size: 0.0,
462                    entry_price: 0.0,
463                    unrealized_pnl: 0.0,
464                    timestamp: new_state.last_updated,
465                    snapshot: false,
466                });
467            }
468        }
469
470        updates
471    }
472
473    async fn poll_accounts_once(
474        cache: &Arc<RwLock<HashMap<String, HypercoreAccountState>>>,
475        client: &reqwest::Client,
476        api_url: &str,
477        event_bus: &Arc<dyn EventBusTrait>,
478        db: &Arc<DatabaseHandler>,
479    ) {
480        let accounts = match Self::load_accounts_from_db(db) {
481            Ok(accounts) => accounts,
482            Err(e) => {
483                error!(
484                    "HypercorePositionService: Failed to load accounts from DB: {}",
485                    e
486                );
487                return;
488            }
489        };
490
491        if accounts.is_empty() {
492            debug!("HypercorePositionService: No accounts to poll");
493            return;
494        }
495
496        debug!(
497            "HypercorePositionService: Polling {} accounts",
498            accounts.len()
499        );
500
501        let new_accounts: Vec<String> = {
502            let cache_read = cache.read().await;
503            accounts
504                .iter()
505                .filter(|a| !cache_read.contains_key(*a))
506                .cloned()
507                .collect()
508        };
509
510        if !new_accounts.is_empty() {
511            info!(
512                "HypercorePositionService: Detected {} new accounts, will emit snapshots",
513                new_accounts.len()
514            );
515        }
516
517        let futures: Vec<_> = accounts
518            .iter()
519            .map(|account| async {
520                let result = Self::fetch_clearinghouse_state(client, api_url, account).await;
521                (account.clone(), result)
522            })
523            .collect();
524
525        let results = futures::future::join_all(futures).await;
526        let mut cache_write = cache.write().await;
527        let sender = event_bus.get_sender();
528
529        for (account, result) in results {
530            match result {
531                Ok(new_state) => {
532                    let is_new_account = !cache_write.contains_key(&account);
533
534                    if is_new_account {
535                        for position in &new_state.perp_positions {
536                            let Some(update) = Self::build_position_update(
537                                &account,
538                                position,
539                                new_state.last_updated,
540                                true,
541                            ) else {
542                                continue;
543                            };
544
545                            debug!(
546                                "HypercorePositionService: Emitting snapshot for new account {}/{}: size={}, entry_price={}",
547                                update.account, update.coin, update.size, update.entry_price
548                            );
549
550                            if let Err(e) =
551                                sender.send(EngineMessage::HypercorePositionUpdate(update))
552                            {
553                                error!("HypercorePositionService: Failed to emit snapshot: {}", e);
554                            }
555                        }
556                    } else {
557                        let old_state = cache_write.get(&account);
558                        let updates = Self::compute_position_diffs(&account, old_state, &new_state);
559
560                        for update in updates {
561                            debug!(
562                                "HypercorePositionService: Emitting diff for {}/{}: size={}, entry_price={}",
563                                update.account, update.coin, update.size, update.entry_price
564                            );
565                            if let Err(e) =
566                                sender.send(EngineMessage::HypercorePositionUpdate(update))
567                            {
568                                error!("HypercorePositionService: Failed to emit diff: {}", e);
569                            }
570                        }
571                    }
572
573                    cache_write.insert(account, new_state);
574                }
575                Err(e) => {
576                    error!(
577                        "HypercorePositionService: Failed to fetch state for {}: {}",
578                        account, e
579                    );
580                }
581            }
582        }
583    }
584
585    fn build_position_update(
586        account: &str,
587        position: &PerpPosition,
588        timestamp: u64,
589        snapshot: bool,
590    ) -> Option<HypercorePositionUpdate> {
591        let Some(entry_price) = position.entry_price else {
592            warn!(
593                "HypercorePositionService: Skipping {} update for {}/{} with missing entry_px",
594                if snapshot { "snapshot" } else { "diff" },
595                account,
596                position.coin
597            );
598            return None;
599        };
600
601        Some(HypercorePositionUpdate {
602            account: account.to_string(),
603            coin: position.coin.clone(),
604            size: position.size,
605            entry_price,
606            unrealized_pnl: position.unrealized_pnl,
607            timestamp,
608            snapshot,
609        })
610    }
611
612    fn panic_message(panic_info: &Box<dyn std::any::Any + Send>) -> String {
613        if let Some(s) = panic_info.downcast_ref::<&str>() {
614            s.to_string()
615        } else if let Some(s) = panic_info.downcast_ref::<String>() {
616            s.clone()
617        } else {
618            "Unknown panic".to_string()
619        }
620    }
621
622    async fn fetch_account_state(
623        &self,
624        address: String,
625    ) -> Option<(String, HypercoreAccountState)> {
626        match Self::fetch_clearinghouse_state(&self.client, &self.api_url, &address).await {
627            Ok(state) => {
628                info!(
629                    "HypercorePositionService: Fetched state for {}: account_value={}, positions={}",
630                    address,
631                    state.account_value,
632                    state.perp_positions.len()
633                );
634                Some((address, state))
635            }
636            Err(e) => {
637                error!(
638                    "HypercorePositionService: Failed to fetch state for {}: {}",
639                    address, e
640                );
641                None
642            }
643        }
644    }
645
646    async fn fetch_clearinghouse_state(
647        client: &reqwest::Client,
648        api_url: &str,
649        address: &str,
650    ) -> Result<HypercoreAccountState> {
651        debug!(
652            "HypercorePositionService: Sending request to {} for address {}",
653            api_url, address
654        );
655
656        let request = ClearinghouseRequest {
657            request_type: "clearinghouseState".to_string(),
658            user: address.to_string(),
659        };
660
661        let response = client
662            .post(api_url)
663            .json(&request)
664            .timeout(Duration::from_secs(10))
665            .send()
666            .await
667            .context("Failed to send request to Hyperliquid API")?;
668
669        let status = response.status();
670        debug!(
671            "HypercorePositionService: Received response with status {} for address {}",
672            status, address
673        );
674
675        if !status.is_success() {
676            let error_text = response
677                .text()
678                .await
679                .unwrap_or_else(|_| "unknown error".to_string());
680            anyhow::bail!("API request failed with status {}: {}", status, error_text);
681        }
682
683        let clearinghouse_state: ClearinghouseState = response
684            .json()
685            .await
686            .context("Failed to parse clearinghouse state response")?;
687
688        debug!(
689            "HypercorePositionService: Successfully parsed clearinghouse state for {}",
690            address
691        );
692        Self::convert_to_hypercore_state(clearinghouse_state)
693    }
694
695    fn parse_float(s: &str, field_name: &str) -> Result<f64> {
696        s.parse::<f64>().map_err(|e| {
697            warn!(
698                "HypercorePositionService: Failed to parse float for field '{}': value='{}', error={}",
699                field_name, s, e
700            );
701            anyhow::anyhow!(
702                "Invalid float value for field '{}': '{}' ({})",
703                field_name,
704                s,
705                e
706            )
707        })
708    }
709
710    fn parse_optional_float(s: &str, field_name: &str) -> Result<Option<f64>> {
711        if s.is_empty() {
712            return Ok(None);
713        }
714        Self::parse_float(s, field_name).map(Some)
715    }
716
717    fn convert_to_hypercore_state(state: ClearinghouseState) -> Result<HypercoreAccountState> {
718        let account_value = Self::parse_float(&state.margin_summary.account_value, "account_value")
719            .context("Failed to parse margin_summary.account_value")?;
720        let total_margin_used =
721            Self::parse_float(&state.margin_summary.total_margin_used, "total_margin_used")
722                .context("Failed to parse margin_summary.total_margin_used")?;
723        let withdrawable = Self::parse_float(&state.withdrawable, "withdrawable")
724            .context("Failed to parse withdrawable")?;
725
726        let mut perp_positions: Vec<PerpPosition> = Vec::with_capacity(state.asset_positions.len());
727
728        for asset in state.asset_positions {
729            let coin = asset.position.coin;
730            let size = Self::parse_float(&asset.position.szi, &format!("{}.szi", coin))
731                .with_context(|| format!("Failed to parse position size for {}", coin))?;
732            let entry_price = match &asset.position.entry_px {
733                Some(p) => Self::parse_optional_float(p, &format!("{}.entry_px", coin))
734                    .with_context(|| format!("Failed to parse entry price for {}", coin))?,
735                None => None,
736            };
737            let position_value = Self::parse_float(
738                &asset.position.position_value,
739                &format!("{}.position_value", coin),
740            )
741            .with_context(|| format!("Failed to parse position value for {}", coin))?;
742            let unrealized_pnl = Self::parse_float(
743                &asset.position.unrealized_pnl,
744                &format!("{}.unrealized_pnl", coin),
745            )
746            .with_context(|| format!("Failed to parse unrealized PnL for {}", coin))?;
747            let margin_used = Self::parse_float(
748                &asset.position.margin_used,
749                &format!("{}.margin_used", coin),
750            )
751            .with_context(|| format!("Failed to parse margin used for {}", coin))?;
752            let liquidation_price = match &asset.position.liquidation_px {
753                Some(p) => Self::parse_optional_float(p, &format!("{}.liquidation_px", coin))
754                    .with_context(|| format!("Failed to parse liquidation price for {}", coin))?,
755                None => None,
756            };
757
758            perp_positions.push(PerpPosition {
759                coin,
760                size,
761                entry_price,
762                position_value,
763                unrealized_pnl,
764                margin_used,
765                liquidation_price,
766            });
767        }
768
769        Ok(HypercoreAccountState {
770            account_value,
771            total_margin_used,
772            withdrawable,
773            perp_positions,
774            last_updated: state.time,
775        })
776    }
777
778    /// Get cached state for an address.
779    pub async fn get_cached_state(&self, address: &str) -> Option<HypercoreAccountState> {
780        let cache = self.cache.read().await;
781        cache.get(address).cloned()
782    }
783}
784
785#[cfg(test)]
786mod tests {
787    use super::*;
788
789    #[test]
790    fn test_compute_position_diffs_new_position() {
791        let old_state = HypercoreAccountState {
792            account_value: 1000.0,
793            total_margin_used: 100.0,
794            withdrawable: 900.0,
795            perp_positions: vec![],
796            last_updated: 100,
797        };
798
799        let new_state = HypercoreAccountState {
800            account_value: 1000.0,
801            total_margin_used: 100.0,
802            withdrawable: 900.0,
803            perp_positions: vec![PerpPosition {
804                coin: "BTC".to_string(),
805                size: 1.0,
806                entry_price: Some(50000.0),
807                position_value: 50000.0,
808                unrealized_pnl: 0.0,
809                margin_used: 100.0,
810                liquidation_price: Some(45000.0),
811            }],
812            last_updated: 200,
813        };
814
815        let updates =
816            HypercorePositionService::compute_position_diffs("0x123", Some(&old_state), &new_state);
817
818        assert_eq!(updates.len(), 1);
819        assert_eq!(updates[0].coin, "BTC");
820        assert_eq!(updates[0].size, 1.0);
821        assert!(!updates[0].snapshot);
822    }
823
824    #[test]
825    fn test_compute_position_diffs_closed_position() {
826        let old_state = HypercoreAccountState {
827            account_value: 1000.0,
828            total_margin_used: 100.0,
829            withdrawable: 900.0,
830            perp_positions: vec![PerpPosition {
831                coin: "BTC".to_string(),
832                size: 1.0,
833                entry_price: Some(50000.0),
834                position_value: 50000.0,
835                unrealized_pnl: 0.0,
836                margin_used: 100.0,
837                liquidation_price: Some(45000.0),
838            }],
839            last_updated: 100,
840        };
841
842        let new_state = HypercoreAccountState {
843            account_value: 1000.0,
844            total_margin_used: 0.0,
845            withdrawable: 1000.0,
846            perp_positions: vec![],
847            last_updated: 200,
848        };
849
850        let updates =
851            HypercorePositionService::compute_position_diffs("0x123", Some(&old_state), &new_state);
852
853        assert_eq!(updates.len(), 1);
854        assert_eq!(updates[0].coin, "BTC");
855        assert_eq!(updates[0].size, 0.0);
856        assert!(!updates[0].snapshot);
857    }
858
859    #[test]
860    fn test_compute_position_diffs_size_change() {
861        let old_state = HypercoreAccountState {
862            account_value: 1000.0,
863            total_margin_used: 100.0,
864            withdrawable: 900.0,
865            perp_positions: vec![PerpPosition {
866                coin: "BTC".to_string(),
867                size: 1.0,
868                entry_price: Some(50000.0),
869                position_value: 50000.0,
870                unrealized_pnl: 0.0,
871                margin_used: 100.0,
872                liquidation_price: Some(45000.0),
873            }],
874            last_updated: 100,
875        };
876
877        let new_state = HypercoreAccountState {
878            account_value: 1000.0,
879            total_margin_used: 200.0,
880            withdrawable: 800.0,
881            perp_positions: vec![PerpPosition {
882                coin: "BTC".to_string(),
883                size: 2.0, // Size changed from 1.0 to 2.0
884                entry_price: Some(50000.0),
885                position_value: 100000.0,
886                unrealized_pnl: 0.0,
887                margin_used: 200.0,
888                liquidation_price: Some(45000.0),
889            }],
890            last_updated: 200,
891        };
892
893        let updates =
894            HypercorePositionService::compute_position_diffs("0x123", Some(&old_state), &new_state);
895
896        assert_eq!(updates.len(), 1);
897        assert_eq!(updates[0].coin, "BTC");
898        assert_eq!(updates[0].size, 2.0);
899        assert!(!updates[0].snapshot);
900    }
901
902    #[test]
903    fn test_compute_position_diffs_no_change() {
904        let old_state = HypercoreAccountState {
905            account_value: 1000.0,
906            total_margin_used: 100.0,
907            withdrawable: 900.0,
908            perp_positions: vec![PerpPosition {
909                coin: "BTC".to_string(),
910                size: 1.0,
911                entry_price: Some(50000.0),
912                position_value: 50000.0,
913                unrealized_pnl: 100.0, // Only unrealized PnL changed
914                margin_used: 100.0,
915                liquidation_price: Some(45000.0),
916            }],
917            last_updated: 100,
918        };
919
920        let new_state = HypercoreAccountState {
921            account_value: 1100.0,
922            total_margin_used: 100.0,
923            withdrawable: 1000.0,
924            perp_positions: vec![PerpPosition {
925                coin: "BTC".to_string(),
926                size: 1.0,                  // Same size
927                entry_price: Some(50000.0), // Same entry price
928                position_value: 51000.0,
929                unrealized_pnl: 1000.0, // Different unrealized PnL — triggers update for equity
930                margin_used: 100.0,
931                liquidation_price: Some(45000.0),
932            }],
933            last_updated: 200,
934        };
935
936        let updates =
937            HypercorePositionService::compute_position_diffs("0x123", Some(&old_state), &new_state);
938
939        assert_eq!(updates.len(), 1, "unrealized PnL change should emit update");
940    }
941}