1use std::num::NonZeroUsize;
17
18use ahash::AHashMap;
19#[cfg(feature = "defi")]
20use nautilus_model::defi::Blockchain;
21use nautilus_model::{
22 data::{BarType, DataType},
23 identifiers::{ClientOrderId, InstrumentId, PositionId, StrategyId, Venue},
24};
25
26use super::core::{Endpoint, MStr, Topic};
27use crate::msgbus::get_message_bus;
28
29pub const CLOSE_TOPIC: &str = "CLOSE";
30
31#[must_use]
32pub fn get_custom_topic(data_type: &DataType) -> MStr<Topic> {
33 get_message_bus()
34 .borrow_mut()
35 .switchboard
36 .get_custom_topic(data_type)
37}
38
39#[must_use]
40pub fn get_instruments_topic(venue: Venue) -> MStr<Topic> {
41 get_message_bus()
42 .borrow_mut()
43 .switchboard
44 .get_instruments_topic(venue)
45}
46
47#[must_use]
48pub fn get_instrument_topic(instrument_id: InstrumentId) -> MStr<Topic> {
49 get_message_bus()
50 .borrow_mut()
51 .switchboard
52 .get_instrument_topic(instrument_id)
53}
54
55#[must_use]
56pub fn get_book_deltas_topic(instrument_id: InstrumentId) -> MStr<Topic> {
57 get_message_bus()
58 .borrow_mut()
59 .switchboard
60 .get_book_deltas_topic(instrument_id)
61}
62
63#[must_use]
64pub fn get_book_depth10_topic(instrument_id: InstrumentId) -> MStr<Topic> {
65 get_message_bus()
66 .borrow_mut()
67 .switchboard
68 .get_book_depth10_topic(instrument_id)
69}
70
71#[must_use]
72pub fn get_book_snapshots_topic(
73 instrument_id: InstrumentId,
74 interval_ms: NonZeroUsize,
75) -> MStr<Topic> {
76 get_message_bus()
77 .borrow_mut()
78 .switchboard
79 .get_book_snapshots_topic(instrument_id, interval_ms)
80}
81
82#[must_use]
83pub fn get_quotes_topic(instrument_id: InstrumentId) -> MStr<Topic> {
84 get_message_bus()
85 .borrow_mut()
86 .switchboard
87 .get_quotes_topic(instrument_id)
88}
89
90#[must_use]
91pub fn get_trades_topic(instrument_id: InstrumentId) -> MStr<Topic> {
92 get_message_bus()
93 .borrow_mut()
94 .switchboard
95 .get_trades_topic(instrument_id)
96}
97
98#[must_use]
99pub fn get_bars_topic(bar_type: BarType) -> MStr<Topic> {
100 get_message_bus()
101 .borrow_mut()
102 .switchboard
103 .get_bars_topic(bar_type)
104}
105
106#[must_use]
107pub fn get_mark_price_topic(instrument_id: InstrumentId) -> MStr<Topic> {
108 get_message_bus()
109 .borrow_mut()
110 .switchboard
111 .get_mark_price_topic(instrument_id)
112}
113
114#[must_use]
115pub fn get_index_price_topic(instrument_id: InstrumentId) -> MStr<Topic> {
116 get_message_bus()
117 .borrow_mut()
118 .switchboard
119 .get_index_price_topic(instrument_id)
120}
121
122#[must_use]
123pub fn get_funding_rate_topic(instrument_id: InstrumentId) -> MStr<Topic> {
124 get_message_bus()
125 .borrow_mut()
126 .switchboard
127 .get_funding_rate_topic(instrument_id)
128}
129
130#[must_use]
131pub fn get_instrument_status_topic(instrument_id: InstrumentId) -> MStr<Topic> {
132 get_message_bus()
133 .borrow_mut()
134 .switchboard
135 .get_instrument_status_topic(instrument_id)
136}
137
138#[must_use]
139pub fn get_instrument_close_topic(instrument_id: InstrumentId) -> MStr<Topic> {
140 get_message_bus()
141 .borrow_mut()
142 .switchboard
143 .get_instrument_close_topic(instrument_id)
144}
145
146#[must_use]
147pub fn get_order_snapshots_topic(client_order_id: ClientOrderId) -> MStr<Topic> {
148 get_message_bus()
149 .borrow_mut()
150 .switchboard
151 .get_order_snapshots_topic(client_order_id)
152}
153
154#[must_use]
155pub fn get_positions_snapshots_topic(position_id: PositionId) -> MStr<Topic> {
156 get_message_bus()
157 .borrow_mut()
158 .switchboard
159 .get_positions_snapshots_topic(position_id)
160}
161
162#[must_use]
163pub fn get_event_orders_topic(strategy_id: StrategyId) -> MStr<Topic> {
164 get_message_bus()
165 .borrow_mut()
166 .switchboard
167 .get_event_orders_topic(strategy_id)
168}
169
170#[must_use]
171pub fn get_event_positions_topic(strategy_id: StrategyId) -> MStr<Topic> {
172 get_message_bus()
173 .borrow_mut()
174 .switchboard
175 .get_event_positions_topic(strategy_id)
176}
177
178#[cfg(feature = "defi")]
179#[must_use]
180pub fn get_defi_blocks_topic(chain: Blockchain) -> MStr<Topic> {
181 get_message_bus()
182 .borrow_mut()
183 .switchboard
184 .get_defi_blocks_topic(chain)
185}
186
187#[cfg(feature = "defi")]
188#[must_use]
189pub fn get_defi_pool_topic(instrument_id: InstrumentId) -> MStr<Topic> {
190 get_message_bus()
191 .borrow_mut()
192 .switchboard
193 .get_defi_pool_topic(instrument_id)
194}
195
196#[cfg(feature = "defi")]
197#[must_use]
198pub fn get_defi_pool_swaps_topic(instrument_id: InstrumentId) -> MStr<Topic> {
199 get_message_bus()
200 .borrow_mut()
201 .switchboard
202 .get_defi_pool_swaps_topic(instrument_id)
203}
204
205#[cfg(feature = "defi")]
206#[must_use]
207pub fn get_defi_liquidity_topic(instrument_id: InstrumentId) -> MStr<Topic> {
208 get_message_bus()
209 .borrow_mut()
210 .switchboard
211 .get_defi_pool_liquidity_topic(instrument_id)
212}
213
214#[derive(Clone, Debug)]
216pub struct MessagingSwitchboard {
217 custom_topics: AHashMap<DataType, MStr<Topic>>,
218 instruments_topics: AHashMap<Venue, MStr<Topic>>,
219 instrument_topics: AHashMap<InstrumentId, MStr<Topic>>,
220 book_deltas_topics: AHashMap<InstrumentId, MStr<Topic>>,
221 book_depth10_topics: AHashMap<InstrumentId, MStr<Topic>>,
222 book_snapshots_topics: AHashMap<InstrumentId, MStr<Topic>>,
223 quote_topics: AHashMap<InstrumentId, MStr<Topic>>,
224 trade_topics: AHashMap<InstrumentId, MStr<Topic>>,
225 bar_topics: AHashMap<BarType, MStr<Topic>>,
226 mark_price_topics: AHashMap<InstrumentId, MStr<Topic>>,
227 index_price_topics: AHashMap<InstrumentId, MStr<Topic>>,
228 funding_rate_topics: AHashMap<InstrumentId, MStr<Topic>>,
229 instrument_status_topics: AHashMap<InstrumentId, MStr<Topic>>,
230 instrument_close_topics: AHashMap<InstrumentId, MStr<Topic>>,
231 event_orders_topics: AHashMap<StrategyId, MStr<Topic>>,
232 event_positions_topics: AHashMap<StrategyId, MStr<Topic>>,
233 order_snapshots_topics: AHashMap<ClientOrderId, MStr<Topic>>,
234 positions_snapshots_topics: AHashMap<PositionId, MStr<Topic>>,
235 #[cfg(feature = "defi")]
236 defi_block_topics: AHashMap<Blockchain, MStr<Topic>>,
237 #[cfg(feature = "defi")]
238 defi_pool_topics: AHashMap<InstrumentId, MStr<Topic>>,
239 #[cfg(feature = "defi")]
240 defi_pool_swap_topics: AHashMap<InstrumentId, MStr<Topic>>,
241 #[cfg(feature = "defi")]
242 defi_pool_liquidity_topics: AHashMap<InstrumentId, MStr<Topic>>,
243}
244
245impl Default for MessagingSwitchboard {
246 fn default() -> Self {
248 Self {
249 custom_topics: AHashMap::new(),
250 instruments_topics: AHashMap::new(),
251 instrument_topics: AHashMap::new(),
252 book_deltas_topics: AHashMap::new(),
253 book_snapshots_topics: AHashMap::new(),
254 book_depth10_topics: AHashMap::new(),
255 quote_topics: AHashMap::new(),
256 trade_topics: AHashMap::new(),
257 mark_price_topics: AHashMap::new(),
258 index_price_topics: AHashMap::new(),
259 funding_rate_topics: AHashMap::new(),
260 bar_topics: AHashMap::new(),
261 instrument_status_topics: AHashMap::new(),
262 instrument_close_topics: AHashMap::new(),
263 order_snapshots_topics: AHashMap::new(),
264 event_orders_topics: AHashMap::new(),
265 event_positions_topics: AHashMap::new(),
266 positions_snapshots_topics: AHashMap::new(),
267 #[cfg(feature = "defi")]
268 defi_block_topics: AHashMap::new(),
269 #[cfg(feature = "defi")]
270 defi_pool_topics: AHashMap::new(),
271 #[cfg(feature = "defi")]
272 defi_pool_swap_topics: AHashMap::new(),
273 #[cfg(feature = "defi")]
274 defi_pool_liquidity_topics: AHashMap::new(),
275 }
276 }
277}
278
279impl MessagingSwitchboard {
280 #[must_use]
281 pub fn data_engine_queue_execute() -> MStr<Endpoint> {
282 "DataEngine.queue_execute".into()
283 }
284
285 #[must_use]
286 pub fn data_engine_execute() -> MStr<Endpoint> {
287 "DataEngine.execute".into()
288 }
289
290 #[must_use]
291 pub fn data_engine_process() -> MStr<Endpoint> {
292 "DataEngine.process".into()
293 }
294
295 #[must_use]
296 pub fn data_engine_response() -> MStr<Endpoint> {
297 "DataEngine.response".into()
298 }
299
300 #[must_use]
301 pub fn exec_engine_execute() -> MStr<Endpoint> {
302 "ExecEngine.execute".into()
303 }
304
305 #[must_use]
306 pub fn exec_engine_process() -> MStr<Endpoint> {
307 "ExecEngine.process".into()
308 }
309
310 #[must_use]
311 pub fn get_custom_topic(&mut self, data_type: &DataType) -> MStr<Topic> {
312 *self
313 .custom_topics
314 .entry(data_type.clone())
315 .or_insert_with(|| format!("data.{}", data_type.topic()).into())
316 }
317
318 #[must_use]
319 pub fn get_instruments_topic(&mut self, venue: Venue) -> MStr<Topic> {
320 *self
321 .instruments_topics
322 .entry(venue)
323 .or_insert_with(|| format!("data.instrument.{venue}").into())
324 }
325
326 #[must_use]
327 pub fn get_instrument_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
328 *self
329 .instrument_topics
330 .entry(instrument_id)
331 .or_insert_with(|| {
332 format!(
333 "data.instrument.{}.{}",
334 instrument_id.venue, instrument_id.symbol
335 )
336 .into()
337 })
338 }
339
340 #[must_use]
341 pub fn get_book_deltas_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
342 *self
343 .book_deltas_topics
344 .entry(instrument_id)
345 .or_insert_with(|| {
346 format!(
347 "data.book.deltas.{}.{}",
348 instrument_id.venue, instrument_id.symbol
349 )
350 .into()
351 })
352 }
353
354 #[must_use]
355 pub fn get_book_depth10_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
356 *self
357 .book_depth10_topics
358 .entry(instrument_id)
359 .or_insert_with(|| {
360 format!(
361 "data.book.depth10.{}.{}",
362 instrument_id.venue, instrument_id.symbol
363 )
364 .into()
365 })
366 }
367
368 #[must_use]
369 pub fn get_book_snapshots_topic(
370 &mut self,
371 instrument_id: InstrumentId,
372 interval_ms: NonZeroUsize,
373 ) -> MStr<Topic> {
374 *self
375 .book_snapshots_topics
376 .entry(instrument_id)
377 .or_insert_with(|| {
378 format!(
379 "data.book.snapshots.{}.{}.{}",
380 instrument_id.venue, instrument_id.symbol, interval_ms
381 )
382 .into()
383 })
384 }
385
386 #[must_use]
387 pub fn get_quotes_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
388 *self.quote_topics.entry(instrument_id).or_insert_with(|| {
389 format!(
390 "data.quotes.{}.{}",
391 instrument_id.venue, instrument_id.symbol
392 )
393 .into()
394 })
395 }
396
397 #[must_use]
398 pub fn get_trades_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
399 *self.trade_topics.entry(instrument_id).or_insert_with(|| {
400 format!(
401 "data.trades.{}.{}",
402 instrument_id.venue, instrument_id.symbol
403 )
404 .into()
405 })
406 }
407
408 #[must_use]
409 pub fn get_bars_topic(&mut self, bar_type: BarType) -> MStr<Topic> {
410 *self
411 .bar_topics
412 .entry(bar_type)
413 .or_insert_with(|| format!("data.bars.{bar_type}").into())
414 }
415
416 #[must_use]
417 pub fn get_mark_price_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
418 *self
419 .mark_price_topics
420 .entry(instrument_id)
421 .or_insert_with(|| {
422 format!(
423 "data.mark_prices.{}.{}",
424 instrument_id.venue, instrument_id.symbol
425 )
426 .into()
427 })
428 }
429
430 #[must_use]
431 pub fn get_index_price_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
432 *self
433 .index_price_topics
434 .entry(instrument_id)
435 .or_insert_with(|| {
436 format!(
437 "data.index_prices.{}.{}",
438 instrument_id.venue, instrument_id.symbol
439 )
440 .into()
441 })
442 }
443
444 pub fn get_funding_rate_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
445 *self
446 .funding_rate_topics
447 .entry(instrument_id)
448 .or_insert_with(|| {
449 format!(
450 "data.funding_rates.{}.{}",
451 instrument_id.venue, instrument_id.symbol
452 )
453 .into()
454 })
455 }
456
457 #[must_use]
458 pub fn get_instrument_status_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
459 *self
460 .instrument_status_topics
461 .entry(instrument_id)
462 .or_insert_with(|| {
463 format!(
464 "data.status.{}.{}",
465 instrument_id.venue, instrument_id.symbol
466 )
467 .into()
468 })
469 }
470
471 #[must_use]
472 pub fn get_instrument_close_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
473 *self
474 .instrument_close_topics
475 .entry(instrument_id)
476 .or_insert_with(|| {
477 format!(
478 "data.close.{}.{}",
479 instrument_id.venue, instrument_id.symbol
480 )
481 .into()
482 })
483 }
484
485 #[must_use]
486 pub fn get_order_snapshots_topic(&mut self, client_order_id: ClientOrderId) -> MStr<Topic> {
487 *self
488 .order_snapshots_topics
489 .entry(client_order_id)
490 .or_insert_with(|| format!("order.snapshots.{client_order_id}").into())
491 }
492
493 #[must_use]
494 pub fn get_positions_snapshots_topic(&mut self, position_id: PositionId) -> MStr<Topic> {
495 *self
496 .positions_snapshots_topics
497 .entry(position_id)
498 .or_insert_with(|| format!("positions.snapshots.{position_id}").into())
499 }
500
501 #[must_use]
502 pub fn get_event_orders_topic(&mut self, strategy_id: StrategyId) -> MStr<Topic> {
503 *self
504 .event_orders_topics
505 .entry(strategy_id)
506 .or_insert_with(|| format!("events.order.{strategy_id}").into())
507 }
508
509 #[must_use]
510 pub fn get_event_positions_topic(&mut self, strategy_id: StrategyId) -> MStr<Topic> {
511 *self
512 .event_positions_topics
513 .entry(strategy_id)
514 .or_insert_with(|| format!("events.position.{strategy_id}").into())
515 }
516
517 #[cfg(feature = "defi")]
518 #[must_use]
519 pub fn get_defi_blocks_topic(&mut self, chain: Blockchain) -> MStr<Topic> {
520 *self
521 .defi_block_topics
522 .entry(chain)
523 .or_insert_with(|| format!("data.defi.blocks.{chain}").into())
524 }
525
526 #[cfg(feature = "defi")]
527 #[must_use]
528 pub fn get_defi_pool_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
529 *self
530 .defi_pool_topics
531 .entry(instrument_id)
532 .or_insert_with(|| format!("data.defi.pool.{instrument_id}").into())
533 }
534
535 #[cfg(feature = "defi")]
536 #[must_use]
537 pub fn get_defi_pool_swaps_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
538 *self
539 .defi_pool_swap_topics
540 .entry(instrument_id)
541 .or_insert_with(|| format!("data.defi.pool_swaps.{instrument_id}").into())
542 }
543
544 #[cfg(feature = "defi")]
545 #[must_use]
546 pub fn get_defi_pool_liquidity_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
547 *self
548 .defi_pool_liquidity_topics
549 .entry(instrument_id)
550 .or_insert_with(|| format!("data.defi.pool_liquidity.{instrument_id}").into())
551 }
552}
553
554#[cfg(test)]
558mod tests {
559 use nautilus_model::{
560 data::{BarType, DataType},
561 identifiers::InstrumentId,
562 };
563 use rstest::*;
564
565 use super::*;
566
567 #[fixture]
568 fn switchboard() -> MessagingSwitchboard {
569 MessagingSwitchboard::default()
570 }
571
572 #[fixture]
573 fn instrument_id() -> InstrumentId {
574 InstrumentId::from("ESZ24.XCME")
575 }
576
577 #[rstest]
578 fn test_get_custom_topic(mut switchboard: MessagingSwitchboard) {
579 let data_type = DataType::new("ExampleDataType", None);
580 let expected_topic = "data.ExampleDataType".into();
581 let result = switchboard.get_custom_topic(&data_type);
582 assert_eq!(result, expected_topic);
583 assert!(switchboard.custom_topics.contains_key(&data_type));
584 }
585
586 #[rstest]
587 fn test_get_instrument_topic(
588 mut switchboard: MessagingSwitchboard,
589 instrument_id: InstrumentId,
590 ) {
591 let expected_topic = "data.instrument.XCME.ESZ24".into();
592 let result = switchboard.get_instrument_topic(instrument_id);
593 assert_eq!(result, expected_topic);
594 assert!(switchboard.instrument_topics.contains_key(&instrument_id));
595 }
596
597 #[rstest]
598 fn test_get_book_deltas_topic(
599 mut switchboard: MessagingSwitchboard,
600 instrument_id: InstrumentId,
601 ) {
602 let expected_topic = "data.book.deltas.XCME.ESZ24".into();
603 let result = switchboard.get_book_deltas_topic(instrument_id);
604 assert_eq!(result, expected_topic);
605 assert!(switchboard.book_deltas_topics.contains_key(&instrument_id));
606 }
607
608 #[rstest]
609 fn test_get_book_depth10_topic(
610 mut switchboard: MessagingSwitchboard,
611 instrument_id: InstrumentId,
612 ) {
613 let expected_topic = "data.book.depth10.XCME.ESZ24".into();
614 let result = switchboard.get_book_depth10_topic(instrument_id);
615 assert_eq!(result, expected_topic);
616 assert!(switchboard.book_depth10_topics.contains_key(&instrument_id));
617 }
618
619 #[rstest]
620 fn test_get_book_snapshots_topic(
621 mut switchboard: MessagingSwitchboard,
622 instrument_id: InstrumentId,
623 ) {
624 let expected_topic = "data.book.snapshots.XCME.ESZ24.1000".into();
625 let interval_ms = NonZeroUsize::new(1000).unwrap();
626 let result = switchboard.get_book_snapshots_topic(instrument_id, interval_ms);
627 assert_eq!(result, expected_topic);
628 assert!(
629 switchboard
630 .book_snapshots_topics
631 .contains_key(&instrument_id)
632 );
633 }
634
635 #[rstest]
636 fn test_get_quotes_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
637 let expected_topic = "data.quotes.XCME.ESZ24".into();
638 let result = switchboard.get_quotes_topic(instrument_id);
639 assert_eq!(result, expected_topic);
640 assert!(switchboard.quote_topics.contains_key(&instrument_id));
641 }
642
643 #[rstest]
644 fn test_get_trades_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
645 let expected_topic = "data.trades.XCME.ESZ24".into();
646 let result = switchboard.get_trades_topic(instrument_id);
647 assert_eq!(result, expected_topic);
648 assert!(switchboard.trade_topics.contains_key(&instrument_id));
649 }
650
651 #[rstest]
652 fn test_get_bars_topic(mut switchboard: MessagingSwitchboard) {
653 let bar_type = BarType::from("ESZ24.XCME-1-MINUTE-LAST-INTERNAL");
654 let expected_topic = format!("data.bars.{bar_type}").into();
655 let result = switchboard.get_bars_topic(bar_type);
656 assert_eq!(result, expected_topic);
657 assert!(switchboard.bar_topics.contains_key(&bar_type));
658 }
659
660 #[rstest]
661 fn test_get_order_snapshots_topic(mut switchboard: MessagingSwitchboard) {
662 let client_order_id = ClientOrderId::from("O-123456789");
663 let expected_topic = format!("order.snapshots.{client_order_id}").into();
664 let result = switchboard.get_order_snapshots_topic(client_order_id);
665 assert_eq!(result, expected_topic);
666 assert!(
667 switchboard
668 .order_snapshots_topics
669 .contains_key(&client_order_id)
670 );
671 }
672}