Skip to main content

hypercall/rsm/unified_engine/
markets.rs

1//! Market lifecycle handlers for `UnifiedEngine`.
2
3use super::*;
4
5impl UnifiedEngine {
6    pub(super) fn apply_market_action(
7        &mut self,
8        command: crate::rsm::apply::MarketActionCommand,
9        timestamp: u64,
10        output: &mut crate::rsm::apply::ApplyOutput,
11    ) -> Result<(), EngineError> {
12        let message = command.message;
13        output.push(EngineMessage::MarketAction(message.clone()));
14        let market = &message.market;
15
16        let (status, reason) = match message.action {
17            MarketAction::CreateMarket => {
18                let is_call = market.option_type == MessageOptionType::Call;
19                match self.create_market_in_memory(
20                    market.symbol.clone(),
21                    market.strike,
22                    is_call,
23                    market.underlying.clone(),
24                    market.expiry as u32,
25                    output,
26                ) {
27                    Ok(()) => (MarketUpdateStatus::MarketCreated, None),
28                    Err(e) if e.contains("already exists") => {
29                        (MarketUpdateStatus::MarketAlreadyExists, None)
30                    }
31                    Err(e) => (MarketUpdateStatus::MarketCreationFailed, Some(e)),
32                }
33            }
34            MarketAction::DeleteMarket => {
35                match self.delete_market_in_memory(&market.symbol, output) {
36                    Ok(()) => (MarketUpdateStatus::MarketDeleted, None),
37                    Err(e) => (MarketUpdateStatus::MarketDeletionFailed, Some(e)),
38                }
39            }
40            MarketAction::ExpireMarket => {
41                let Some(context) = command.expiry_context else {
42                    return Err(EngineError::Market(
43                        "Manual market expiry requires explicit settlement context".to_string(),
44                    ));
45                };
46                let first_new_expiry_effect = output.expiry_effects.len();
47                let market_expired_update = self.process_expiry_tick_collecting(
48                    timestamp,
49                    context,
50                    output,
51                    Some(&market.symbol),
52                )?;
53                if Self::manual_expiry_settled(
54                    &output.expiry_effects[first_new_expiry_effect..],
55                    &market.symbol,
56                ) {
57                    if let Some(update) = market_expired_update {
58                        output.market_response = Some(update);
59                        return Ok(());
60                    }
61                    (MarketUpdateStatus::MarketExpired, None)
62                } else {
63                    (
64                        MarketUpdateStatus::MarketPendingSettlement,
65                        Some("Settlement pending, no settlement price available".to_string()),
66                    )
67                }
68            }
69        };
70
71        let update_msg = MarketUpdateMessage {
72            market: market.clone(),
73            status,
74            timestamp,
75            reason,
76        };
77        output.push(EngineMessage::MarketUpdate(update_msg.clone()));
78        output.market_response = Some(update_msg);
79        Ok(())
80    }
81
82    fn manual_expiry_settled(effects: &[crate::rsm::apply::ExpiryEffect], symbol: &str) -> bool {
83        effects.iter().any(|effect| {
84            matches!(
85                effect,
86                crate::rsm::apply::ExpiryEffect::UpdateInstrumentStatus { symbols, status }
87                    if status == "SETTLED" && symbols.iter().any(|effect_symbol| effect_symbol == symbol)
88            )
89        })
90    }
91
92    fn create_market_in_memory(
93        &mut self,
94        symbol: String,
95        strike: Decimal,
96        is_call: bool,
97        underlying_symbol: String,
98        expiry: u32,
99        output: &mut crate::rsm::apply::ApplyOutput,
100    ) -> Result<(), String> {
101        let option_type = if is_call {
102            hypercall_types::OptionType::Call
103        } else {
104            hypercall_types::OptionType::Put
105        };
106        let expiry_timestamp =
107            crate::rsm::margin_manager::expiry_date_to_timestamp(&underlying_symbol, expiry as u64);
108
109        if let Some(existing) = self.ctx.orderbooks.get(&symbol) {
110            if existing.expiry == expiry as u64
111                && existing.strike == strike
112                && existing.option_type == option_type
113            {
114                return Err(format!("Market already exists for symbol: {}", symbol));
115            }
116
117            return Err(format!("Market already exists for symbol: {}", symbol));
118        }
119
120        output
121            .market_effects
122            .push(crate::rsm::apply::MarketEffect::SaveMarketAndInstrument {
123                underlying: underlying_symbol.clone(),
124                expiry: expiry as i64,
125                instrument: hypercall_db::InstrumentRecord {
126                    instrument_numeric_id: 0,
127                    id: symbol.clone(),
128                    underlying: underlying_symbol.clone(),
129                    strike,
130                    expiry: expiry as i64,
131                    option_type,
132                    option_token_address: None,
133                    status: hypercall_types::api_models::InstrumentStatus::Active,
134                    trading_mode: "orderbook".to_string(),
135                },
136            });
137
138        let orderbook = OrderBook::with_symbol(expiry as u64, strike, option_type, symbol.clone());
139        self.ctx.orderbooks.insert(symbol.clone(), orderbook);
140        self.ctx
141            .instrument_trading_modes
142            .insert(symbol.clone(), hypercall_types::TradingModes::ORDERBOOK);
143
144        self.expiry_manager
145            .expiry_schedules
146            .entry(expiry_timestamp)
147            .or_default()
148            .push(symbol.clone());
149
150        output
151            .market_effects
152            .push(crate::rsm::apply::MarketEffect::RegisterSettlement {
153                underlying: underlying_symbol,
154                symbol,
155                expiry_ts: expiry_timestamp as i64,
156                twap_window_seconds: TWAP_WINDOW_SECONDS,
157            });
158        Ok(())
159    }
160
161    fn delete_market_in_memory(
162        &mut self,
163        symbol: &str,
164        output: &mut crate::rsm::apply::ApplyOutput,
165    ) -> Result<(), String> {
166        if !self.ctx.orderbooks.contains_key(symbol) {
167            return Err(format!("Market does not exist for symbol: {}", symbol));
168        }
169        let orderbook = self.ctx.orderbooks.get(symbol).unwrap();
170        if orderbook.has_open_orders() {
171            return Err(format!(
172                "Cannot delete market with open orders for symbol: {}",
173                symbol
174            ));
175        }
176
177        output
178            .market_effects
179            .push(crate::rsm::apply::MarketEffect::DeleteMarketAndInstrument {
180                symbol: symbol.to_string(),
181            });
182
183        self.ctx.orderbooks.remove(symbol);
184        self.ctx.instrument_trading_modes.remove(symbol);
185        self.expiry_manager.expiry_schedules.retain(|_, symbols| {
186            symbols.retain(|scheduled_symbol| scheduled_symbol != symbol);
187            !symbols.is_empty()
188        });
189        self.ctx.expired_instruments.remove(symbol);
190        Ok(())
191    }
192
193    /// Create a new market (orderbook) for trading
194    pub fn create_market(
195        &mut self,
196        symbol: String,
197        strike: Decimal,
198        is_call: bool,
199        underlying_symbol: String,
200        expiry: u32,
201    ) -> Result<String, String> {
202        // Check if market already exists
203        if self.ctx.orderbooks.contains_key(&symbol) {
204            return Err(format!("Market already exists for symbol: {}", symbol));
205        }
206
207        // Determine option type
208        let option_type = if is_call {
209            hypercall_types::OptionType::Call
210        } else {
211            hypercall_types::OptionType::Put
212        };
213
214        // Persist first so an invariant failure cannot leave in-memory state ahead of durable state.
215        if let Some(ref handler) = self.ctx.db {
216            let new_instrument = hypercall_db::InstrumentRecord {
217                instrument_numeric_id: 0,
218                id: symbol.clone(),
219                underlying: underlying_symbol.clone(),
220                strike,
221                expiry: expiry as i64,
222                option_type,
223                status: hypercall_types::api_models::InstrumentStatus::Active,
224                option_token_address: None,
225                trading_mode: "orderbook".to_string(),
226            };
227
228            if let Err(e) = handler.save_market_and_instrument_sync(
229                &underlying_symbol,
230                expiry as i64,
231                &new_instrument,
232            ) {
233                panic!(
234                    "CRITICAL_FAILURE: Failed to persist market {} to database: {}. \
235                     Market will be lost on restart. Restart required.",
236                    symbol, e
237                );
238            }
239        }
240
241        // Create the orderbook
242        let orderbook = OrderBook::with_symbol(expiry as u64, strike, option_type, symbol.clone());
243
244        self.ctx.orderbooks.insert(symbol.clone(), orderbook);
245        self.ctx
246            .instrument_trading_modes
247            .insert(symbol.clone(), hypercall_types::TradingModes::ORDERBOOK);
248
249        // Track expiry - convert YYYYMMDD format to timestamp
250        let expiry_timestamp =
251            crate::rsm::margin_manager::expiry_date_to_timestamp(&underlying_symbol, expiry as u64);
252        self.expiry_manager
253            .expiry_schedules
254            .entry(expiry_timestamp)
255            .or_default()
256            .push(symbol.clone());
257
258        // Register settlement TWAP window with oracle (30 min before expiry)
259        // This starts TWAP sampling so settlement price is available at expiry
260        if let Some(oracle) = self.ctx.deps.mark_price_oracles.get(&underlying_symbol) {
261            let expiry_ts_i64 = expiry_timestamp as i64;
262            let oracle = oracle.clone();
263            let symbol_for_log = symbol.clone();
264            tokio::spawn(async move {
265                oracle
266                    .register_settlement(expiry_ts_i64, TWAP_WINDOW_SECONDS)
267                    .await;
268                debug!(
269                    "Registered TWAP settlement for {} at expiry {}",
270                    symbol_for_log, expiry_ts_i64
271                );
272            });
273        }
274
275        info!(
276            "Created market for symbol: {} with expiry at {}",
277            symbol, expiry_timestamp
278        );
279        Ok(format!(
280            "Market created successfully for symbol: {}",
281            symbol
282        ))
283    }
284
285    /// Delete an existing market (orderbook)
286    pub fn delete_market(&mut self, symbol: String) -> Result<String, String> {
287        // Check if market exists
288        if !self.ctx.orderbooks.contains_key(&symbol) {
289            return Err(format!("Market does not exist for symbol: {}", symbol));
290        }
291
292        // Check if orderbook has any open orders
293        let orderbook = self.ctx.orderbooks.get(&symbol).unwrap();
294        if orderbook.has_open_orders() {
295            return Err(format!(
296                "Cannot delete market with open orders for symbol: {}",
297                symbol
298            ));
299        }
300
301        if let Some(ref handler) = self.ctx.db {
302            if let Err(e) = handler.delete_market_and_instrument_sync(&symbol) {
303                panic!(
304                    "CRITICAL_FAILURE: Failed to delete market {} from database: {}. \
305                     In-memory and persisted state would diverge.",
306                    symbol, e
307                );
308            }
309        }
310
311        // Remove the orderbook
312        self.ctx.orderbooks.remove(&symbol);
313        self.ctx.instrument_trading_modes.remove(&symbol);
314
315        // Remove from expiry schedules and related expiry tracking.
316        self.expiry_manager.expiry_schedules.retain(|_, symbols| {
317            symbols.retain(|scheduled_symbol| scheduled_symbol != &symbol);
318            !symbols.is_empty()
319        });
320        self.ctx.expired_instruments.remove(&symbol);
321
322        info!("Deleted market for symbol: {}", symbol);
323        Ok(format!(
324            "Market deleted successfully for symbol: {}",
325            symbol
326        ))
327    }
328
329    /// Apply a `HashMap<underlying, TradingModes>` update from the
330    /// catalog manager's notify channel. For every instrument the
331    /// engine currently knows about, if its underlying is in the
332    /// update map, rewrite the in-memory `instrument_trading_modes`
333    /// entry to the new mode. Instruments whose underlying isn't in
334    /// the map are left untouched.
335    ///
336    /// This closes the "catalog flip drifts for 60s" gap: prior to
337    /// this hook, the engine's `instrument_trading_modes` only got
338    /// populated on recovery + CreateMarket, so a live catalog rewrite
339    /// (e.g. GOLD flipped from `orderbook` to `rfq`) would not be
340    /// observable to `plan_rfq_execution` or the order admission path
341    /// until the engine next restarted. The catalog manager sends the
342    /// new map on a `tokio::sync::watch` channel whenever it updates
343    /// any row, and the engine main loop calls this helper.
344    pub fn apply_underlying_trading_mode_update(
345        &mut self,
346        update: &std::collections::HashMap<String, hypercall_types::TradingModes>,
347    ) {
348        if update.is_empty() {
349            return;
350        }
351        let mut changed = 0usize;
352        let mut checked = 0usize;
353        let symbols: Vec<String> = self.ctx.instrument_trading_modes.keys().cloned().collect();
354        for symbol in symbols {
355            checked += 1;
356            let Some(underlying) = symbol.split('-').next() else {
357                continue;
358            };
359            if let Some(new_mode) = update.get(underlying) {
360                let prev = self
361                    .ctx
362                    .instrument_trading_modes
363                    .insert(symbol.clone(), *new_mode);
364                if prev.map(|p| p != *new_mode).unwrap_or(true) {
365                    changed += 1;
366                    info!(
367                        symbol = %symbol,
368                        underlying = %underlying,
369                        old_mode = ?prev,
370                        new_mode = ?new_mode,
371                        "Applied live trading_mode update from catalog manager"
372                    );
373                }
374            }
375        }
376        if changed > 0 {
377            info!(
378                changed,
379                checked,
380                update_size = update.len(),
381                "apply_underlying_trading_mode_update: updated live trading_modes"
382            );
383        }
384    }
385}