1use std::num::NonZeroUsize;
17
18use ahash::AHashMap;
19#[cfg(feature = "defi")]
20use nautilus_model::defi::Blockchain;
21use nautilus_model::{
22 data::{BarType, DataType},
23 identifiers::{ClientOrderId, InstrumentId, PositionId, StrategyId, Venue},
24};
25
26use super::core::{Endpoint, MStr, Topic};
27use crate::msgbus::get_message_bus;
28
29pub const CLOSE_TOPIC: &str = "CLOSE";
30
31#[must_use]
32pub fn get_custom_topic(data_type: &DataType) -> MStr<Topic> {
33 get_message_bus()
34 .borrow_mut()
35 .switchboard
36 .get_custom_topic(data_type)
37}
38
39#[must_use]
40pub fn get_instruments_topic(venue: Venue) -> MStr<Topic> {
41 get_message_bus()
42 .borrow_mut()
43 .switchboard
44 .get_instruments_topic(venue)
45}
46
47#[must_use]
48pub fn get_instrument_topic(instrument_id: InstrumentId) -> MStr<Topic> {
49 get_message_bus()
50 .borrow_mut()
51 .switchboard
52 .get_instrument_topic(instrument_id)
53}
54
55#[must_use]
56pub fn get_book_deltas_topic(instrument_id: InstrumentId) -> MStr<Topic> {
57 get_message_bus()
58 .borrow_mut()
59 .switchboard
60 .get_book_deltas_topic(instrument_id)
61}
62
63#[must_use]
64pub fn get_book_depth10_topic(instrument_id: InstrumentId) -> MStr<Topic> {
65 get_message_bus()
66 .borrow_mut()
67 .switchboard
68 .get_book_depth10_topic(instrument_id)
69}
70
71#[must_use]
72pub fn get_book_snapshots_topic(
73 instrument_id: InstrumentId,
74 interval_ms: NonZeroUsize,
75) -> MStr<Topic> {
76 get_message_bus()
77 .borrow_mut()
78 .switchboard
79 .get_book_snapshots_topic(instrument_id, interval_ms)
80}
81
82#[must_use]
83pub fn get_quotes_topic(instrument_id: InstrumentId) -> MStr<Topic> {
84 get_message_bus()
85 .borrow_mut()
86 .switchboard
87 .get_quotes_topic(instrument_id)
88}
89
90#[must_use]
91pub fn get_trades_topic(instrument_id: InstrumentId) -> MStr<Topic> {
92 get_message_bus()
93 .borrow_mut()
94 .switchboard
95 .get_trades_topic(instrument_id)
96}
97
98#[must_use]
99pub fn get_bars_topic(bar_type: BarType) -> MStr<Topic> {
100 get_message_bus()
101 .borrow_mut()
102 .switchboard
103 .get_bars_topic(bar_type)
104}
105
106#[must_use]
107pub fn get_mark_price_topic(instrument_id: InstrumentId) -> MStr<Topic> {
108 get_message_bus()
109 .borrow_mut()
110 .switchboard
111 .get_mark_price_topic(instrument_id)
112}
113
114#[must_use]
115pub fn get_index_price_topic(instrument_id: InstrumentId) -> MStr<Topic> {
116 get_message_bus()
117 .borrow_mut()
118 .switchboard
119 .get_index_price_topic(instrument_id)
120}
121
122#[must_use]
123pub fn get_funding_rate_topic(instrument_id: InstrumentId) -> MStr<Topic> {
124 get_message_bus()
125 .borrow_mut()
126 .switchboard
127 .get_funding_rate_topic(instrument_id)
128}
129
130#[must_use]
131pub fn get_instrument_status_topic(instrument_id: InstrumentId) -> MStr<Topic> {
132 get_message_bus()
133 .borrow_mut()
134 .switchboard
135 .get_instrument_status_topic(instrument_id)
136}
137
138#[must_use]
139pub fn get_instrument_close_topic(instrument_id: InstrumentId) -> MStr<Topic> {
140 get_message_bus()
141 .borrow_mut()
142 .switchboard
143 .get_instrument_close_topic(instrument_id)
144}
145
146#[must_use]
147pub fn get_order_snapshots_topic(client_order_id: ClientOrderId) -> MStr<Topic> {
148 get_message_bus()
149 .borrow_mut()
150 .switchboard
151 .get_order_snapshots_topic(client_order_id)
152}
153
154#[must_use]
155pub fn get_positions_snapshots_topic(position_id: PositionId) -> MStr<Topic> {
156 get_message_bus()
157 .borrow_mut()
158 .switchboard
159 .get_positions_snapshots_topic(position_id)
160}
161
162#[must_use]
163pub fn get_event_orders_topic(strategy_id: StrategyId) -> MStr<Topic> {
164 get_message_bus()
165 .borrow_mut()
166 .switchboard
167 .get_event_orders_topic(strategy_id)
168}
169
170#[must_use]
171pub fn get_event_positions_topic(strategy_id: StrategyId) -> MStr<Topic> {
172 get_message_bus()
173 .borrow_mut()
174 .switchboard
175 .get_event_positions_topic(strategy_id)
176}
177
178#[cfg(feature = "defi")]
179#[must_use]
180pub fn get_defi_blocks_topic(chain: Blockchain) -> MStr<Topic> {
181 get_message_bus()
182 .borrow_mut()
183 .switchboard
184 .get_defi_blocks_topic(chain)
185}
186
187#[cfg(feature = "defi")]
188#[must_use]
189pub fn get_defi_pool_topic(instrument_id: InstrumentId) -> MStr<Topic> {
190 get_message_bus()
191 .borrow_mut()
192 .switchboard
193 .get_defi_pool_topic(instrument_id)
194}
195
196#[cfg(feature = "defi")]
197#[must_use]
198pub fn get_defi_pool_swaps_topic(instrument_id: InstrumentId) -> MStr<Topic> {
199 get_message_bus()
200 .borrow_mut()
201 .switchboard
202 .get_defi_pool_swaps_topic(instrument_id)
203}
204
205#[cfg(feature = "defi")]
206#[must_use]
207pub fn get_defi_liquidity_topic(instrument_id: InstrumentId) -> MStr<Topic> {
208 get_message_bus()
209 .borrow_mut()
210 .switchboard
211 .get_defi_pool_liquidity_topic(instrument_id)
212}
213
214#[cfg(feature = "defi")]
215#[must_use]
216pub fn get_defi_collect_topic(instrument_id: InstrumentId) -> MStr<Topic> {
217 get_message_bus()
218 .borrow_mut()
219 .switchboard
220 .get_defi_pool_liquidity_topic(instrument_id)
221}
222
223#[derive(Clone, Debug)]
225pub struct MessagingSwitchboard {
226 custom_topics: AHashMap<DataType, MStr<Topic>>,
227 instruments_topics: AHashMap<Venue, MStr<Topic>>,
228 instrument_topics: AHashMap<InstrumentId, MStr<Topic>>,
229 book_deltas_topics: AHashMap<InstrumentId, MStr<Topic>>,
230 book_depth10_topics: AHashMap<InstrumentId, MStr<Topic>>,
231 book_snapshots_topics: AHashMap<InstrumentId, MStr<Topic>>,
232 quote_topics: AHashMap<InstrumentId, MStr<Topic>>,
233 trade_topics: AHashMap<InstrumentId, MStr<Topic>>,
234 bar_topics: AHashMap<BarType, MStr<Topic>>,
235 mark_price_topics: AHashMap<InstrumentId, MStr<Topic>>,
236 index_price_topics: AHashMap<InstrumentId, MStr<Topic>>,
237 funding_rate_topics: AHashMap<InstrumentId, MStr<Topic>>,
238 instrument_status_topics: AHashMap<InstrumentId, MStr<Topic>>,
239 instrument_close_topics: AHashMap<InstrumentId, MStr<Topic>>,
240 event_orders_topics: AHashMap<StrategyId, MStr<Topic>>,
241 event_positions_topics: AHashMap<StrategyId, MStr<Topic>>,
242 order_snapshots_topics: AHashMap<ClientOrderId, MStr<Topic>>,
243 positions_snapshots_topics: AHashMap<PositionId, MStr<Topic>>,
244 #[cfg(feature = "defi")]
245 defi_block_topics: AHashMap<Blockchain, MStr<Topic>>,
246 #[cfg(feature = "defi")]
247 defi_pool_topics: AHashMap<InstrumentId, MStr<Topic>>,
248 #[cfg(feature = "defi")]
249 defi_pool_swap_topics: AHashMap<InstrumentId, MStr<Topic>>,
250 #[cfg(feature = "defi")]
251 defi_pool_liquidity_topics: AHashMap<InstrumentId, MStr<Topic>>,
252 #[cfg(feature = "defi")]
253 defi_pool_collect_topics: AHashMap<InstrumentId, MStr<Topic>>,
254}
255
256impl Default for MessagingSwitchboard {
257 fn default() -> Self {
259 Self {
260 custom_topics: AHashMap::new(),
261 instruments_topics: AHashMap::new(),
262 instrument_topics: AHashMap::new(),
263 book_deltas_topics: AHashMap::new(),
264 book_snapshots_topics: AHashMap::new(),
265 book_depth10_topics: AHashMap::new(),
266 quote_topics: AHashMap::new(),
267 trade_topics: AHashMap::new(),
268 mark_price_topics: AHashMap::new(),
269 index_price_topics: AHashMap::new(),
270 funding_rate_topics: AHashMap::new(),
271 bar_topics: AHashMap::new(),
272 instrument_status_topics: AHashMap::new(),
273 instrument_close_topics: AHashMap::new(),
274 order_snapshots_topics: AHashMap::new(),
275 event_orders_topics: AHashMap::new(),
276 event_positions_topics: AHashMap::new(),
277 positions_snapshots_topics: AHashMap::new(),
278 #[cfg(feature = "defi")]
279 defi_block_topics: AHashMap::new(),
280 #[cfg(feature = "defi")]
281 defi_pool_topics: AHashMap::new(),
282 #[cfg(feature = "defi")]
283 defi_pool_swap_topics: AHashMap::new(),
284 #[cfg(feature = "defi")]
285 defi_pool_liquidity_topics: AHashMap::new(),
286 #[cfg(feature = "defi")]
287 defi_pool_collect_topics: AHashMap::new(),
288 }
289 }
290}
291
292impl MessagingSwitchboard {
293 #[must_use]
294 pub fn data_engine_queue_execute() -> MStr<Endpoint> {
295 "DataEngine.queue_execute".into()
296 }
297
298 #[must_use]
299 pub fn data_engine_execute() -> MStr<Endpoint> {
300 "DataEngine.execute".into()
301 }
302
303 #[must_use]
304 pub fn data_engine_process() -> MStr<Endpoint> {
305 "DataEngine.process".into()
306 }
307
308 #[must_use]
309 pub fn data_engine_response() -> MStr<Endpoint> {
310 "DataEngine.response".into()
311 }
312
313 #[must_use]
314 pub fn exec_engine_execute() -> MStr<Endpoint> {
315 "ExecEngine.execute".into()
316 }
317
318 #[must_use]
319 pub fn exec_engine_process() -> MStr<Endpoint> {
320 "ExecEngine.process".into()
321 }
322
323 #[must_use]
324 pub fn get_custom_topic(&mut self, data_type: &DataType) -> MStr<Topic> {
325 *self
326 .custom_topics
327 .entry(data_type.clone())
328 .or_insert_with(|| format!("data.{}", data_type.topic()).into())
329 }
330
331 #[must_use]
332 pub fn get_instruments_topic(&mut self, venue: Venue) -> MStr<Topic> {
333 *self
334 .instruments_topics
335 .entry(venue)
336 .or_insert_with(|| format!("data.instrument.{venue}").into())
337 }
338
339 #[must_use]
340 pub fn get_instrument_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
341 *self
342 .instrument_topics
343 .entry(instrument_id)
344 .or_insert_with(|| {
345 format!(
346 "data.instrument.{}.{}",
347 instrument_id.venue, instrument_id.symbol
348 )
349 .into()
350 })
351 }
352
353 #[must_use]
354 pub fn get_book_deltas_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
355 *self
356 .book_deltas_topics
357 .entry(instrument_id)
358 .or_insert_with(|| {
359 format!(
360 "data.book.deltas.{}.{}",
361 instrument_id.venue, instrument_id.symbol
362 )
363 .into()
364 })
365 }
366
367 #[must_use]
368 pub fn get_book_depth10_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
369 *self
370 .book_depth10_topics
371 .entry(instrument_id)
372 .or_insert_with(|| {
373 format!(
374 "data.book.depth10.{}.{}",
375 instrument_id.venue, instrument_id.symbol
376 )
377 .into()
378 })
379 }
380
381 #[must_use]
382 pub fn get_book_snapshots_topic(
383 &mut self,
384 instrument_id: InstrumentId,
385 interval_ms: NonZeroUsize,
386 ) -> MStr<Topic> {
387 *self
388 .book_snapshots_topics
389 .entry(instrument_id)
390 .or_insert_with(|| {
391 format!(
392 "data.book.snapshots.{}.{}.{}",
393 instrument_id.venue, instrument_id.symbol, interval_ms
394 )
395 .into()
396 })
397 }
398
399 #[must_use]
400 pub fn get_quotes_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
401 *self.quote_topics.entry(instrument_id).or_insert_with(|| {
402 format!(
403 "data.quotes.{}.{}",
404 instrument_id.venue, instrument_id.symbol
405 )
406 .into()
407 })
408 }
409
410 #[must_use]
411 pub fn get_trades_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
412 *self.trade_topics.entry(instrument_id).or_insert_with(|| {
413 format!(
414 "data.trades.{}.{}",
415 instrument_id.venue, instrument_id.symbol
416 )
417 .into()
418 })
419 }
420
421 #[must_use]
422 pub fn get_bars_topic(&mut self, bar_type: BarType) -> MStr<Topic> {
423 *self
424 .bar_topics
425 .entry(bar_type)
426 .or_insert_with(|| format!("data.bars.{bar_type}").into())
427 }
428
429 #[must_use]
430 pub fn get_mark_price_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
431 *self
432 .mark_price_topics
433 .entry(instrument_id)
434 .or_insert_with(|| {
435 format!(
436 "data.mark_prices.{}.{}",
437 instrument_id.venue, instrument_id.symbol
438 )
439 .into()
440 })
441 }
442
443 #[must_use]
444 pub fn get_index_price_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
445 *self
446 .index_price_topics
447 .entry(instrument_id)
448 .or_insert_with(|| {
449 format!(
450 "data.index_prices.{}.{}",
451 instrument_id.venue, instrument_id.symbol
452 )
453 .into()
454 })
455 }
456
457 pub fn get_funding_rate_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
458 *self
459 .funding_rate_topics
460 .entry(instrument_id)
461 .or_insert_with(|| {
462 format!(
463 "data.funding_rates.{}.{}",
464 instrument_id.venue, instrument_id.symbol
465 )
466 .into()
467 })
468 }
469
470 #[must_use]
471 pub fn get_instrument_status_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
472 *self
473 .instrument_status_topics
474 .entry(instrument_id)
475 .or_insert_with(|| {
476 format!(
477 "data.status.{}.{}",
478 instrument_id.venue, instrument_id.symbol
479 )
480 .into()
481 })
482 }
483
484 #[must_use]
485 pub fn get_instrument_close_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
486 *self
487 .instrument_close_topics
488 .entry(instrument_id)
489 .or_insert_with(|| {
490 format!(
491 "data.close.{}.{}",
492 instrument_id.venue, instrument_id.symbol
493 )
494 .into()
495 })
496 }
497
498 #[must_use]
499 pub fn get_order_snapshots_topic(&mut self, client_order_id: ClientOrderId) -> MStr<Topic> {
500 *self
501 .order_snapshots_topics
502 .entry(client_order_id)
503 .or_insert_with(|| format!("order.snapshots.{client_order_id}").into())
504 }
505
506 #[must_use]
507 pub fn get_positions_snapshots_topic(&mut self, position_id: PositionId) -> MStr<Topic> {
508 *self
509 .positions_snapshots_topics
510 .entry(position_id)
511 .or_insert_with(|| format!("positions.snapshots.{position_id}").into())
512 }
513
514 #[must_use]
515 pub fn get_event_orders_topic(&mut self, strategy_id: StrategyId) -> MStr<Topic> {
516 *self
517 .event_orders_topics
518 .entry(strategy_id)
519 .or_insert_with(|| format!("events.order.{strategy_id}").into())
520 }
521
522 #[must_use]
523 pub fn get_event_positions_topic(&mut self, strategy_id: StrategyId) -> MStr<Topic> {
524 *self
525 .event_positions_topics
526 .entry(strategy_id)
527 .or_insert_with(|| format!("events.position.{strategy_id}").into())
528 }
529
530 #[cfg(feature = "defi")]
531 #[must_use]
532 pub fn get_defi_blocks_topic(&mut self, chain: Blockchain) -> MStr<Topic> {
533 *self
534 .defi_block_topics
535 .entry(chain)
536 .or_insert_with(|| format!("data.defi.blocks.{chain}").into())
537 }
538
539 #[cfg(feature = "defi")]
540 #[must_use]
541 pub fn get_defi_pool_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
542 *self
543 .defi_pool_topics
544 .entry(instrument_id)
545 .or_insert_with(|| format!("data.defi.pool.{instrument_id}").into())
546 }
547
548 #[cfg(feature = "defi")]
549 #[must_use]
550 pub fn get_defi_pool_swaps_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
551 *self
552 .defi_pool_swap_topics
553 .entry(instrument_id)
554 .or_insert_with(|| format!("data.defi.pool_swaps.{instrument_id}").into())
555 }
556
557 #[cfg(feature = "defi")]
558 #[must_use]
559 pub fn get_defi_pool_liquidity_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
560 *self
561 .defi_pool_liquidity_topics
562 .entry(instrument_id)
563 .or_insert_with(|| format!("data.defi.pool_liquidity.{instrument_id}").into())
564 }
565
566 #[cfg(feature = "defi")]
567 #[must_use]
568 pub fn get_defi_pool_collect_topics(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
569 *self
570 .defi_pool_collect_topics
571 .entry(instrument_id)
572 .or_insert_with(|| format!("data.defi.pool_collect.{instrument_id}").into())
573 }
574}
575
576#[cfg(test)]
580mod tests {
581 use nautilus_model::{
582 data::{BarType, DataType},
583 identifiers::InstrumentId,
584 };
585 use rstest::*;
586
587 use super::*;
588
589 #[fixture]
590 fn switchboard() -> MessagingSwitchboard {
591 MessagingSwitchboard::default()
592 }
593
594 #[fixture]
595 fn instrument_id() -> InstrumentId {
596 InstrumentId::from("ESZ24.XCME")
597 }
598
599 #[rstest]
600 fn test_get_custom_topic(mut switchboard: MessagingSwitchboard) {
601 let data_type = DataType::new("ExampleDataType", None);
602 let expected_topic = "data.ExampleDataType".into();
603 let result = switchboard.get_custom_topic(&data_type);
604 assert_eq!(result, expected_topic);
605 assert!(switchboard.custom_topics.contains_key(&data_type));
606 }
607
608 #[rstest]
609 fn test_get_instrument_topic(
610 mut switchboard: MessagingSwitchboard,
611 instrument_id: InstrumentId,
612 ) {
613 let expected_topic = "data.instrument.XCME.ESZ24".into();
614 let result = switchboard.get_instrument_topic(instrument_id);
615 assert_eq!(result, expected_topic);
616 assert!(switchboard.instrument_topics.contains_key(&instrument_id));
617 }
618
619 #[rstest]
620 fn test_get_book_deltas_topic(
621 mut switchboard: MessagingSwitchboard,
622 instrument_id: InstrumentId,
623 ) {
624 let expected_topic = "data.book.deltas.XCME.ESZ24".into();
625 let result = switchboard.get_book_deltas_topic(instrument_id);
626 assert_eq!(result, expected_topic);
627 assert!(switchboard.book_deltas_topics.contains_key(&instrument_id));
628 }
629
630 #[rstest]
631 fn test_get_book_depth10_topic(
632 mut switchboard: MessagingSwitchboard,
633 instrument_id: InstrumentId,
634 ) {
635 let expected_topic = "data.book.depth10.XCME.ESZ24".into();
636 let result = switchboard.get_book_depth10_topic(instrument_id);
637 assert_eq!(result, expected_topic);
638 assert!(switchboard.book_depth10_topics.contains_key(&instrument_id));
639 }
640
641 #[rstest]
642 fn test_get_book_snapshots_topic(
643 mut switchboard: MessagingSwitchboard,
644 instrument_id: InstrumentId,
645 ) {
646 let expected_topic = "data.book.snapshots.XCME.ESZ24.1000".into();
647 let interval_ms = NonZeroUsize::new(1000).unwrap();
648 let result = switchboard.get_book_snapshots_topic(instrument_id, interval_ms);
649 assert_eq!(result, expected_topic);
650 assert!(
651 switchboard
652 .book_snapshots_topics
653 .contains_key(&instrument_id)
654 );
655 }
656
657 #[rstest]
658 fn test_get_quotes_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
659 let expected_topic = "data.quotes.XCME.ESZ24".into();
660 let result = switchboard.get_quotes_topic(instrument_id);
661 assert_eq!(result, expected_topic);
662 assert!(switchboard.quote_topics.contains_key(&instrument_id));
663 }
664
665 #[rstest]
666 fn test_get_trades_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
667 let expected_topic = "data.trades.XCME.ESZ24".into();
668 let result = switchboard.get_trades_topic(instrument_id);
669 assert_eq!(result, expected_topic);
670 assert!(switchboard.trade_topics.contains_key(&instrument_id));
671 }
672
673 #[rstest]
674 fn test_get_bars_topic(mut switchboard: MessagingSwitchboard) {
675 let bar_type = BarType::from("ESZ24.XCME-1-MINUTE-LAST-INTERNAL");
676 let expected_topic = format!("data.bars.{bar_type}").into();
677 let result = switchboard.get_bars_topic(bar_type);
678 assert_eq!(result, expected_topic);
679 assert!(switchboard.bar_topics.contains_key(&bar_type));
680 }
681
682 #[rstest]
683 fn test_get_order_snapshots_topic(mut switchboard: MessagingSwitchboard) {
684 let client_order_id = ClientOrderId::from("O-123456789");
685 let expected_topic = format!("order.snapshots.{client_order_id}").into();
686 let result = switchboard.get_order_snapshots_topic(client_order_id);
687 assert_eq!(result, expected_topic);
688 assert!(
689 switchboard
690 .order_snapshots_topics
691 .contains_key(&client_order_id)
692 );
693 }
694}