1use std::collections::HashMap;
17
18use nautilus_model::{
19 data::{BarType, DataType},
20 identifiers::{ClientOrderId, InstrumentId, PositionId, StrategyId, Venue},
21};
22
23use super::core::{Endpoint, MStr, Topic};
24use crate::msgbus::get_message_bus;
25
26pub const CLOSE_TOPIC: &str = "CLOSE";
27
28#[must_use]
29pub fn get_custom_topic(data_type: &DataType) -> MStr<Topic> {
30 get_message_bus()
31 .borrow_mut()
32 .switchboard
33 .get_custom_topic(data_type)
34}
35
36#[must_use]
37pub fn get_instruments_topic(venue: Venue) -> MStr<Topic> {
38 get_message_bus()
39 .borrow_mut()
40 .switchboard
41 .get_instruments_topic(venue)
42}
43
44#[must_use]
45pub fn get_instrument_topic(instrument_id: InstrumentId) -> MStr<Topic> {
46 get_message_bus()
47 .borrow_mut()
48 .switchboard
49 .get_instrument_topic(instrument_id)
50}
51
52#[must_use]
53pub fn get_book_deltas_topic(instrument_id: InstrumentId) -> MStr<Topic> {
54 get_message_bus()
55 .borrow_mut()
56 .switchboard
57 .get_book_deltas_topic(instrument_id)
58}
59
60#[must_use]
61pub fn get_book_depth10_topic(instrument_id: InstrumentId) -> MStr<Topic> {
62 get_message_bus()
63 .borrow_mut()
64 .switchboard
65 .get_book_depth10_topic(instrument_id)
66}
67
68#[must_use]
69pub fn get_book_snapshots_topic(instrument_id: InstrumentId) -> MStr<Topic> {
70 get_message_bus()
71 .borrow_mut()
72 .switchboard
73 .get_book_snapshots_topic(instrument_id)
74}
75
76#[must_use]
77pub fn get_quotes_topic(instrument_id: InstrumentId) -> MStr<Topic> {
78 get_message_bus()
79 .borrow_mut()
80 .switchboard
81 .get_quotes_topic(instrument_id)
82}
83
84#[must_use]
85pub fn get_trades_topic(instrument_id: InstrumentId) -> MStr<Topic> {
86 get_message_bus()
87 .borrow_mut()
88 .switchboard
89 .get_trades_topic(instrument_id)
90}
91
92#[must_use]
93pub fn get_bars_topic(bar_type: BarType) -> MStr<Topic> {
94 get_message_bus()
95 .borrow_mut()
96 .switchboard
97 .get_bars_topic(bar_type)
98}
99
100#[must_use]
101pub fn get_mark_price_topic(instrument_id: InstrumentId) -> MStr<Topic> {
102 get_message_bus()
103 .borrow_mut()
104 .switchboard
105 .get_mark_price_topic(instrument_id)
106}
107
108#[must_use]
109pub fn get_index_price_topic(instrument_id: InstrumentId) -> MStr<Topic> {
110 get_message_bus()
111 .borrow_mut()
112 .switchboard
113 .get_index_price_topic(instrument_id)
114}
115
116#[must_use]
117pub fn get_instrument_status_topic(instrument_id: InstrumentId) -> MStr<Topic> {
118 get_message_bus()
119 .borrow_mut()
120 .switchboard
121 .get_instrument_status_topic(instrument_id)
122}
123
124#[must_use]
125pub fn get_instrument_close_topic(instrument_id: InstrumentId) -> MStr<Topic> {
126 get_message_bus()
127 .borrow_mut()
128 .switchboard
129 .get_instrument_close_topic(instrument_id)
130}
131
132#[must_use]
133pub fn get_order_snapshots_topic(client_order_id: ClientOrderId) -> MStr<Topic> {
134 get_message_bus()
135 .borrow_mut()
136 .switchboard
137 .get_order_snapshots_topic(client_order_id)
138}
139
140#[must_use]
141pub fn get_positions_snapshots_topic(position_id: PositionId) -> MStr<Topic> {
142 get_message_bus()
143 .borrow_mut()
144 .switchboard
145 .get_positions_snapshots_topic(position_id)
146}
147
148#[must_use]
149pub fn get_event_orders_topic(strategy_id: StrategyId) -> MStr<Topic> {
150 get_message_bus()
151 .borrow_mut()
152 .switchboard
153 .get_event_orders_topic(strategy_id)
154}
155
156#[must_use]
157pub fn get_event_positions_topic(strategy_id: StrategyId) -> MStr<Topic> {
158 get_message_bus()
159 .borrow_mut()
160 .switchboard
161 .get_event_positions_topic(strategy_id)
162}
163
164#[derive(Clone, Debug)]
166pub struct MessagingSwitchboard {
167 custom_topics: HashMap<DataType, MStr<Topic>>,
168 instruments_topics: HashMap<Venue, MStr<Topic>>,
169 instrument_topics: HashMap<InstrumentId, MStr<Topic>>,
170 book_deltas_topics: HashMap<InstrumentId, MStr<Topic>>,
171 book_depth10_topics: HashMap<InstrumentId, MStr<Topic>>,
172 book_snapshots_topics: HashMap<InstrumentId, MStr<Topic>>,
173 quote_topics: HashMap<InstrumentId, MStr<Topic>>,
174 trade_topics: HashMap<InstrumentId, MStr<Topic>>,
175 bar_topics: HashMap<BarType, MStr<Topic>>,
176 mark_price_topics: HashMap<InstrumentId, MStr<Topic>>,
177 index_price_topics: HashMap<InstrumentId, MStr<Topic>>,
178 instrument_status_topics: HashMap<InstrumentId, MStr<Topic>>,
179 instrument_close_topics: HashMap<InstrumentId, MStr<Topic>>,
180 event_orders_topics: HashMap<StrategyId, MStr<Topic>>,
181 event_positions_topics: HashMap<StrategyId, MStr<Topic>>,
182 order_snapshots_topics: HashMap<ClientOrderId, MStr<Topic>>,
183 positions_snapshots_topics: HashMap<PositionId, MStr<Topic>>,
184}
185
186impl Default for MessagingSwitchboard {
187 fn default() -> Self {
189 Self {
190 custom_topics: HashMap::new(),
191 instruments_topics: HashMap::new(),
192 instrument_topics: HashMap::new(),
193 book_deltas_topics: HashMap::new(),
194 book_snapshots_topics: HashMap::new(),
195 book_depth10_topics: HashMap::new(),
196 quote_topics: HashMap::new(),
197 trade_topics: HashMap::new(),
198 mark_price_topics: HashMap::new(),
199 index_price_topics: HashMap::new(),
200 bar_topics: HashMap::new(),
201 instrument_status_topics: HashMap::new(),
202 instrument_close_topics: HashMap::new(),
203 order_snapshots_topics: HashMap::new(),
204 event_orders_topics: HashMap::new(),
205 event_positions_topics: HashMap::new(),
206 positions_snapshots_topics: HashMap::new(),
207 }
208 }
209}
210
211impl MessagingSwitchboard {
212 #[must_use]
213 pub fn data_engine_execute() -> MStr<Endpoint> {
214 "DataEngine.execute".into()
215 }
216
217 #[must_use]
218 pub fn data_engine_process() -> MStr<Endpoint> {
219 "DataEngine.process".into()
220 }
221
222 #[must_use]
223 pub fn exec_engine_execute() -> MStr<Endpoint> {
224 "ExecEngine.execute".into()
225 }
226
227 #[must_use]
228 pub fn exec_engine_process() -> MStr<Endpoint> {
229 "ExecEngine.process".into()
230 }
231
232 #[must_use]
233 pub fn get_custom_topic(&mut self, data_type: &DataType) -> MStr<Topic> {
234 *self
235 .custom_topics
236 .entry(data_type.clone())
237 .or_insert_with(|| format!("data.{}", data_type.topic()).into())
238 }
239
240 #[must_use]
241 pub fn get_instruments_topic(&mut self, venue: Venue) -> MStr<Topic> {
242 *self
243 .instruments_topics
244 .entry(venue)
245 .or_insert_with(|| format!("data.instrument.{}", venue).into())
246 }
247
248 #[must_use]
249 pub fn get_instrument_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
250 *self
251 .instrument_topics
252 .entry(instrument_id)
253 .or_insert_with(|| {
254 format!(
255 "data.instrument.{}.{}",
256 instrument_id.venue, instrument_id.symbol
257 )
258 .into()
259 })
260 }
261
262 #[must_use]
263 pub fn get_book_deltas_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
264 *self
265 .book_deltas_topics
266 .entry(instrument_id)
267 .or_insert_with(|| {
268 format!(
269 "data.book.deltas.{}.{}",
270 instrument_id.venue, instrument_id.symbol
271 )
272 .into()
273 })
274 }
275
276 #[must_use]
277 pub fn get_book_depth10_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
278 *self
279 .book_depth10_topics
280 .entry(instrument_id)
281 .or_insert_with(|| {
282 format!(
283 "data.book.depth10.{}.{}",
284 instrument_id.venue, instrument_id.symbol
285 )
286 .into()
287 })
288 }
289
290 #[must_use]
291 pub fn get_book_snapshots_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
292 *self
293 .book_snapshots_topics
294 .entry(instrument_id)
295 .or_insert_with(|| {
296 format!(
297 "data.book.snapshots.{}.{}",
298 instrument_id.venue, instrument_id.symbol
299 )
300 .into()
301 })
302 }
303
304 #[must_use]
305 pub fn get_quotes_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
306 *self.quote_topics.entry(instrument_id).or_insert_with(|| {
307 format!(
308 "data.quotes.{}.{}",
309 instrument_id.venue, instrument_id.symbol
310 )
311 .into()
312 })
313 }
314
315 #[must_use]
316 pub fn get_trades_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
317 *self.trade_topics.entry(instrument_id).or_insert_with(|| {
318 format!(
319 "data.trades.{}.{}",
320 instrument_id.venue, instrument_id.symbol
321 )
322 .into()
323 })
324 }
325
326 #[must_use]
327 pub fn get_bars_topic(&mut self, bar_type: BarType) -> MStr<Topic> {
328 *self
329 .bar_topics
330 .entry(bar_type)
331 .or_insert_with(|| format!("data.bars.{bar_type}").into())
332 }
333
334 #[must_use]
335 pub fn get_mark_price_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
336 *self
337 .mark_price_topics
338 .entry(instrument_id)
339 .or_insert_with(|| {
340 format!(
341 "data.mark_prices.{}.{}",
342 instrument_id.venue, instrument_id.symbol
343 )
344 .into()
345 })
346 }
347
348 #[must_use]
349 pub fn get_index_price_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
350 *self
351 .index_price_topics
352 .entry(instrument_id)
353 .or_insert_with(|| {
354 format!(
355 "data.index_prices.{}.{}",
356 instrument_id.venue, instrument_id.symbol
357 )
358 .into()
359 })
360 }
361
362 #[must_use]
363 pub fn get_instrument_status_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
364 *self
365 .instrument_status_topics
366 .entry(instrument_id)
367 .or_insert_with(|| {
368 format!(
369 "data.status.{}.{}",
370 instrument_id.venue, instrument_id.symbol
371 )
372 .into()
373 })
374 }
375
376 #[must_use]
377 pub fn get_instrument_close_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
378 *self
379 .instrument_close_topics
380 .entry(instrument_id)
381 .or_insert_with(|| {
382 format!(
383 "data.close.{}.{}",
384 instrument_id.venue, instrument_id.symbol
385 )
386 .into()
387 })
388 }
389
390 #[must_use]
391 pub fn get_order_snapshots_topic(&mut self, client_order_id: ClientOrderId) -> MStr<Topic> {
392 *self
393 .order_snapshots_topics
394 .entry(client_order_id)
395 .or_insert_with(|| format!("order.snapshots.{client_order_id}").into())
396 }
397
398 #[must_use]
399 pub fn get_positions_snapshots_topic(&mut self, position_id: PositionId) -> MStr<Topic> {
400 *self
401 .positions_snapshots_topics
402 .entry(position_id)
403 .or_insert_with(|| format!("positions.snapshots.{position_id}").into())
404 }
405
406 #[must_use]
407 pub fn get_event_orders_topic(&mut self, strategy_id: StrategyId) -> MStr<Topic> {
408 *self
409 .event_orders_topics
410 .entry(strategy_id)
411 .or_insert_with(|| format!("events.order.{strategy_id}").into())
412 }
413
414 #[must_use]
415 pub fn get_event_positions_topic(&mut self, strategy_id: StrategyId) -> MStr<Topic> {
416 *self
417 .event_positions_topics
418 .entry(strategy_id)
419 .or_insert_with(|| format!("events.position.{strategy_id}").into())
420 }
421}
422
423#[cfg(test)]
427mod tests {
428 use nautilus_model::{
429 data::{BarType, DataType},
430 identifiers::InstrumentId,
431 };
432 use rstest::*;
433
434 use super::*;
435
436 #[fixture]
437 fn switchboard() -> MessagingSwitchboard {
438 MessagingSwitchboard::default()
439 }
440
441 #[fixture]
442 fn instrument_id() -> InstrumentId {
443 InstrumentId::from("ESZ24.XCME")
444 }
445
446 #[rstest]
447 fn test_get_custom_topic(mut switchboard: MessagingSwitchboard) {
448 let data_type = DataType::new("ExampleDataType", None);
449 let expected_topic = "data.ExampleDataType".into();
450 let result = switchboard.get_custom_topic(&data_type);
451 assert_eq!(result, expected_topic);
452 assert!(switchboard.custom_topics.contains_key(&data_type));
453 }
454
455 #[rstest]
456 fn test_get_instrument_topic(
457 mut switchboard: MessagingSwitchboard,
458 instrument_id: InstrumentId,
459 ) {
460 let expected_topic = "data.instrument.XCME.ESZ24".into();
461 let result = switchboard.get_instrument_topic(instrument_id);
462 assert_eq!(result, expected_topic);
463 assert!(switchboard.instrument_topics.contains_key(&instrument_id));
464 }
465
466 #[rstest]
467 fn test_get_book_deltas_topic(
468 mut switchboard: MessagingSwitchboard,
469 instrument_id: InstrumentId,
470 ) {
471 let expected_topic = "data.book.deltas.XCME.ESZ24".into();
472 let result = switchboard.get_book_deltas_topic(instrument_id);
473 assert_eq!(result, expected_topic);
474 assert!(switchboard.book_deltas_topics.contains_key(&instrument_id));
475 }
476
477 #[rstest]
478 fn test_get_book_depth10_topic(
479 mut switchboard: MessagingSwitchboard,
480 instrument_id: InstrumentId,
481 ) {
482 let expected_topic = "data.book.depth10.XCME.ESZ24".into();
483 let result = switchboard.get_book_depth10_topic(instrument_id);
484 assert_eq!(result, expected_topic);
485 assert!(switchboard.book_depth10_topics.contains_key(&instrument_id));
486 }
487
488 #[rstest]
489 fn test_get_book_snapshots_topic(
490 mut switchboard: MessagingSwitchboard,
491 instrument_id: InstrumentId,
492 ) {
493 let expected_topic = "data.book.snapshots.XCME.ESZ24".into();
494 let result = switchboard.get_book_snapshots_topic(instrument_id);
495 assert_eq!(result, expected_topic);
496 assert!(
497 switchboard
498 .book_snapshots_topics
499 .contains_key(&instrument_id)
500 );
501 }
502
503 #[rstest]
504 fn test_get_quotes_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
505 let expected_topic = "data.quotes.XCME.ESZ24".into();
506 let result = switchboard.get_quotes_topic(instrument_id);
507 assert_eq!(result, expected_topic);
508 assert!(switchboard.quote_topics.contains_key(&instrument_id));
509 }
510
511 #[rstest]
512 fn test_get_trades_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
513 let expected_topic = "data.trades.XCME.ESZ24".into();
514 let result = switchboard.get_trades_topic(instrument_id);
515 assert_eq!(result, expected_topic);
516 assert!(switchboard.trade_topics.contains_key(&instrument_id));
517 }
518
519 #[rstest]
520 fn test_get_bars_topic(mut switchboard: MessagingSwitchboard) {
521 let bar_type = BarType::from("ESZ24.XCME-1-MINUTE-LAST-INTERNAL");
522 let expected_topic = format!("data.bars.{bar_type}").into();
523 let result = switchboard.get_bars_topic(bar_type);
524 assert_eq!(result, expected_topic);
525 assert!(switchboard.bar_topics.contains_key(&bar_type));
526 }
527
528 #[rstest]
529 fn test_get_order_snapshots_topic(mut switchboard: MessagingSwitchboard) {
530 let client_order_id = ClientOrderId::from("O-123456789");
531 let expected_topic = format!("order.snapshots.{client_order_id}").into();
532 let result = switchboard.get_order_snapshots_topic(client_order_id);
533 assert_eq!(result, expected_topic);
534 assert!(
535 switchboard
536 .order_snapshots_topics
537 .contains_key(&client_order_id)
538 );
539 }
540}