hypercall/rsm/unified_engine/
markets.rs1use super::*;
4
5impl UnifiedEngine {
6 pub(super) fn apply_market_action(
7 &mut self,
8 command: crate::rsm::apply::MarketActionCommand,
9 timestamp: u64,
10 output: &mut crate::rsm::apply::ApplyOutput,
11 ) -> Result<(), EngineError> {
12 let message = command.message;
13 output.push(EngineMessage::MarketAction(message.clone()));
14 let market = &message.market;
15
16 let (status, reason) = match message.action {
17 MarketAction::CreateMarket => {
18 let is_call = market.option_type == MessageOptionType::Call;
19 match self.create_market_in_memory(
20 market.symbol.clone(),
21 market.strike,
22 is_call,
23 market.underlying.clone(),
24 market.expiry as u32,
25 output,
26 ) {
27 Ok(()) => (MarketUpdateStatus::MarketCreated, None),
28 Err(e) if e.contains("already exists") => {
29 (MarketUpdateStatus::MarketAlreadyExists, None)
30 }
31 Err(e) => (MarketUpdateStatus::MarketCreationFailed, Some(e)),
32 }
33 }
34 MarketAction::DeleteMarket => {
35 match self.delete_market_in_memory(&market.symbol, output) {
36 Ok(()) => (MarketUpdateStatus::MarketDeleted, None),
37 Err(e) => (MarketUpdateStatus::MarketDeletionFailed, Some(e)),
38 }
39 }
40 MarketAction::ExpireMarket => {
41 let Some(context) = command.expiry_context else {
42 return Err(EngineError::Market(
43 "Manual market expiry requires explicit settlement context".to_string(),
44 ));
45 };
46 let first_new_expiry_effect = output.expiry_effects.len();
47 let market_expired_update = self.process_expiry_tick_collecting(
48 timestamp,
49 context,
50 output,
51 Some(&market.symbol),
52 )?;
53 if Self::manual_expiry_settled(
54 &output.expiry_effects[first_new_expiry_effect..],
55 &market.symbol,
56 ) {
57 if let Some(update) = market_expired_update {
58 output.market_response = Some(update);
59 return Ok(());
60 }
61 (MarketUpdateStatus::MarketExpired, None)
62 } else {
63 (
64 MarketUpdateStatus::MarketPendingSettlement,
65 Some("Settlement pending, no settlement price available".to_string()),
66 )
67 }
68 }
69 };
70
71 let update_msg = MarketUpdateMessage {
72 market: market.clone(),
73 status,
74 timestamp,
75 reason,
76 };
77 output.push(EngineMessage::MarketUpdate(update_msg.clone()));
78 output.market_response = Some(update_msg);
79 Ok(())
80 }
81
82 fn manual_expiry_settled(effects: &[crate::rsm::apply::ExpiryEffect], symbol: &str) -> bool {
83 effects.iter().any(|effect| {
84 matches!(
85 effect,
86 crate::rsm::apply::ExpiryEffect::UpdateInstrumentStatus { symbols, status }
87 if status == "SETTLED" && symbols.iter().any(|effect_symbol| effect_symbol == symbol)
88 )
89 })
90 }
91
92 fn create_market_in_memory(
93 &mut self,
94 symbol: String,
95 strike: Decimal,
96 is_call: bool,
97 underlying_symbol: String,
98 expiry: u32,
99 output: &mut crate::rsm::apply::ApplyOutput,
100 ) -> Result<(), String> {
101 let option_type = if is_call {
102 hypercall_types::OptionType::Call
103 } else {
104 hypercall_types::OptionType::Put
105 };
106 let expiry_timestamp =
107 crate::rsm::margin_manager::expiry_date_to_timestamp(&underlying_symbol, expiry as u64);
108
109 if let Some(existing) = self.ctx.orderbooks.get(&symbol) {
110 if existing.expiry == expiry as u64
111 && existing.strike == strike
112 && existing.option_type == option_type
113 {
114 return Err(format!("Market already exists for symbol: {}", symbol));
115 }
116
117 return Err(format!("Market already exists for symbol: {}", symbol));
118 }
119
120 output
121 .market_effects
122 .push(crate::rsm::apply::MarketEffect::SaveMarketAndInstrument {
123 underlying: underlying_symbol.clone(),
124 expiry: expiry as i64,
125 instrument: hypercall_db::InstrumentRecord {
126 instrument_numeric_id: 0,
127 id: symbol.clone(),
128 underlying: underlying_symbol.clone(),
129 strike,
130 expiry: expiry as i64,
131 option_type,
132 option_token_address: None,
133 status: hypercall_types::api_models::InstrumentStatus::Active,
134 trading_mode: "orderbook".to_string(),
135 },
136 });
137
138 let orderbook = OrderBook::with_symbol(expiry as u64, strike, option_type, symbol.clone());
139 self.ctx.orderbooks.insert(symbol.clone(), orderbook);
140 self.ctx
141 .instrument_trading_modes
142 .insert(symbol.clone(), hypercall_types::TradingModes::ORDERBOOK);
143
144 self.expiry_manager
145 .expiry_schedules
146 .entry(expiry_timestamp)
147 .or_default()
148 .push(symbol.clone());
149
150 output
151 .market_effects
152 .push(crate::rsm::apply::MarketEffect::RegisterSettlement {
153 underlying: underlying_symbol,
154 symbol,
155 expiry_ts: expiry_timestamp as i64,
156 twap_window_seconds: TWAP_WINDOW_SECONDS,
157 });
158 Ok(())
159 }
160
161 fn delete_market_in_memory(
162 &mut self,
163 symbol: &str,
164 output: &mut crate::rsm::apply::ApplyOutput,
165 ) -> Result<(), String> {
166 if !self.ctx.orderbooks.contains_key(symbol) {
167 return Err(format!("Market does not exist for symbol: {}", symbol));
168 }
169 let orderbook = self.ctx.orderbooks.get(symbol).unwrap();
170 if orderbook.has_open_orders() {
171 return Err(format!(
172 "Cannot delete market with open orders for symbol: {}",
173 symbol
174 ));
175 }
176
177 output
178 .market_effects
179 .push(crate::rsm::apply::MarketEffect::DeleteMarketAndInstrument {
180 symbol: symbol.to_string(),
181 });
182
183 self.ctx.orderbooks.remove(symbol);
184 self.ctx.instrument_trading_modes.remove(symbol);
185 self.expiry_manager.expiry_schedules.retain(|_, symbols| {
186 symbols.retain(|scheduled_symbol| scheduled_symbol != symbol);
187 !symbols.is_empty()
188 });
189 self.ctx.expired_instruments.remove(symbol);
190 Ok(())
191 }
192
193 pub fn create_market(
195 &mut self,
196 symbol: String,
197 strike: Decimal,
198 is_call: bool,
199 underlying_symbol: String,
200 expiry: u32,
201 ) -> Result<String, String> {
202 if self.ctx.orderbooks.contains_key(&symbol) {
204 return Err(format!("Market already exists for symbol: {}", symbol));
205 }
206
207 let option_type = if is_call {
209 hypercall_types::OptionType::Call
210 } else {
211 hypercall_types::OptionType::Put
212 };
213
214 if let Some(ref handler) = self.ctx.db {
216 let new_instrument = hypercall_db::InstrumentRecord {
217 instrument_numeric_id: 0,
218 id: symbol.clone(),
219 underlying: underlying_symbol.clone(),
220 strike,
221 expiry: expiry as i64,
222 option_type,
223 status: hypercall_types::api_models::InstrumentStatus::Active,
224 option_token_address: None,
225 trading_mode: "orderbook".to_string(),
226 };
227
228 if let Err(e) = handler.save_market_and_instrument_sync(
229 &underlying_symbol,
230 expiry as i64,
231 &new_instrument,
232 ) {
233 panic!(
234 "CRITICAL_FAILURE: Failed to persist market {} to database: {}. \
235 Market will be lost on restart. Restart required.",
236 symbol, e
237 );
238 }
239 }
240
241 let orderbook = OrderBook::with_symbol(expiry as u64, strike, option_type, symbol.clone());
243
244 self.ctx.orderbooks.insert(symbol.clone(), orderbook);
245 self.ctx
246 .instrument_trading_modes
247 .insert(symbol.clone(), hypercall_types::TradingModes::ORDERBOOK);
248
249 let expiry_timestamp =
251 crate::rsm::margin_manager::expiry_date_to_timestamp(&underlying_symbol, expiry as u64);
252 self.expiry_manager
253 .expiry_schedules
254 .entry(expiry_timestamp)
255 .or_default()
256 .push(symbol.clone());
257
258 if let Some(oracle) = self.ctx.deps.mark_price_oracles.get(&underlying_symbol) {
261 let expiry_ts_i64 = expiry_timestamp as i64;
262 let oracle = oracle.clone();
263 let symbol_for_log = symbol.clone();
264 tokio::spawn(async move {
265 oracle
266 .register_settlement(expiry_ts_i64, TWAP_WINDOW_SECONDS)
267 .await;
268 debug!(
269 "Registered TWAP settlement for {} at expiry {}",
270 symbol_for_log, expiry_ts_i64
271 );
272 });
273 }
274
275 info!(
276 "Created market for symbol: {} with expiry at {}",
277 symbol, expiry_timestamp
278 );
279 Ok(format!(
280 "Market created successfully for symbol: {}",
281 symbol
282 ))
283 }
284
285 pub fn delete_market(&mut self, symbol: String) -> Result<String, String> {
287 if !self.ctx.orderbooks.contains_key(&symbol) {
289 return Err(format!("Market does not exist for symbol: {}", symbol));
290 }
291
292 let orderbook = self.ctx.orderbooks.get(&symbol).unwrap();
294 if orderbook.has_open_orders() {
295 return Err(format!(
296 "Cannot delete market with open orders for symbol: {}",
297 symbol
298 ));
299 }
300
301 if let Some(ref handler) = self.ctx.db {
302 if let Err(e) = handler.delete_market_and_instrument_sync(&symbol) {
303 panic!(
304 "CRITICAL_FAILURE: Failed to delete market {} from database: {}. \
305 In-memory and persisted state would diverge.",
306 symbol, e
307 );
308 }
309 }
310
311 self.ctx.orderbooks.remove(&symbol);
313 self.ctx.instrument_trading_modes.remove(&symbol);
314
315 self.expiry_manager.expiry_schedules.retain(|_, symbols| {
317 symbols.retain(|scheduled_symbol| scheduled_symbol != &symbol);
318 !symbols.is_empty()
319 });
320 self.ctx.expired_instruments.remove(&symbol);
321
322 info!("Deleted market for symbol: {}", symbol);
323 Ok(format!(
324 "Market deleted successfully for symbol: {}",
325 symbol
326 ))
327 }
328
329 pub fn apply_underlying_trading_mode_update(
345 &mut self,
346 update: &std::collections::HashMap<String, hypercall_types::TradingModes>,
347 ) {
348 if update.is_empty() {
349 return;
350 }
351 let mut changed = 0usize;
352 let mut checked = 0usize;
353 let symbols: Vec<String> = self.ctx.instrument_trading_modes.keys().cloned().collect();
354 for symbol in symbols {
355 checked += 1;
356 let Some(underlying) = symbol.split('-').next() else {
357 continue;
358 };
359 if let Some(new_mode) = update.get(underlying) {
360 let prev = self
361 .ctx
362 .instrument_trading_modes
363 .insert(symbol.clone(), *new_mode);
364 if prev.map(|p| p != *new_mode).unwrap_or(true) {
365 changed += 1;
366 info!(
367 symbol = %symbol,
368 underlying = %underlying,
369 old_mode = ?prev,
370 new_mode = ?new_mode,
371 "Applied live trading_mode update from catalog manager"
372 );
373 }
374 }
375 }
376 if changed > 0 {
377 info!(
378 changed,
379 checked,
380 update_size = update.len(),
381 "apply_underlying_trading_mode_update: updated live trading_modes"
382 );
383 }
384 }
385}