nautilus_common/msgbus/
switchboard.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::num::NonZeroUsize;
17
18use ahash::AHashMap;
19#[cfg(feature = "defi")]
20use nautilus_model::defi::Blockchain;
21use nautilus_model::{
22    data::{BarType, DataType},
23    identifiers::{ClientOrderId, InstrumentId, PositionId, StrategyId, Venue},
24};
25
26use super::core::{Endpoint, MStr, Topic};
27use crate::msgbus::get_message_bus;
28
29pub const CLOSE_TOPIC: &str = "CLOSE";
30
31#[must_use]
32pub fn get_custom_topic(data_type: &DataType) -> MStr<Topic> {
33    get_message_bus()
34        .borrow_mut()
35        .switchboard
36        .get_custom_topic(data_type)
37}
38
39#[must_use]
40pub fn get_instruments_topic(venue: Venue) -> MStr<Topic> {
41    get_message_bus()
42        .borrow_mut()
43        .switchboard
44        .get_instruments_topic(venue)
45}
46
47#[must_use]
48pub fn get_instrument_topic(instrument_id: InstrumentId) -> MStr<Topic> {
49    get_message_bus()
50        .borrow_mut()
51        .switchboard
52        .get_instrument_topic(instrument_id)
53}
54
55#[must_use]
56pub fn get_book_deltas_topic(instrument_id: InstrumentId) -> MStr<Topic> {
57    get_message_bus()
58        .borrow_mut()
59        .switchboard
60        .get_book_deltas_topic(instrument_id)
61}
62
63#[must_use]
64pub fn get_book_depth10_topic(instrument_id: InstrumentId) -> MStr<Topic> {
65    get_message_bus()
66        .borrow_mut()
67        .switchboard
68        .get_book_depth10_topic(instrument_id)
69}
70
71#[must_use]
72pub fn get_book_snapshots_topic(
73    instrument_id: InstrumentId,
74    interval_ms: NonZeroUsize,
75) -> MStr<Topic> {
76    get_message_bus()
77        .borrow_mut()
78        .switchboard
79        .get_book_snapshots_topic(instrument_id, interval_ms)
80}
81
82#[must_use]
83pub fn get_quotes_topic(instrument_id: InstrumentId) -> MStr<Topic> {
84    get_message_bus()
85        .borrow_mut()
86        .switchboard
87        .get_quotes_topic(instrument_id)
88}
89
90#[must_use]
91pub fn get_trades_topic(instrument_id: InstrumentId) -> MStr<Topic> {
92    get_message_bus()
93        .borrow_mut()
94        .switchboard
95        .get_trades_topic(instrument_id)
96}
97
98#[must_use]
99pub fn get_bars_topic(bar_type: BarType) -> MStr<Topic> {
100    get_message_bus()
101        .borrow_mut()
102        .switchboard
103        .get_bars_topic(bar_type)
104}
105
106#[must_use]
107pub fn get_mark_price_topic(instrument_id: InstrumentId) -> MStr<Topic> {
108    get_message_bus()
109        .borrow_mut()
110        .switchboard
111        .get_mark_price_topic(instrument_id)
112}
113
114#[must_use]
115pub fn get_index_price_topic(instrument_id: InstrumentId) -> MStr<Topic> {
116    get_message_bus()
117        .borrow_mut()
118        .switchboard
119        .get_index_price_topic(instrument_id)
120}
121
122#[must_use]
123pub fn get_funding_rate_topic(instrument_id: InstrumentId) -> MStr<Topic> {
124    get_message_bus()
125        .borrow_mut()
126        .switchboard
127        .get_funding_rate_topic(instrument_id)
128}
129
130#[must_use]
131pub fn get_instrument_status_topic(instrument_id: InstrumentId) -> MStr<Topic> {
132    get_message_bus()
133        .borrow_mut()
134        .switchboard
135        .get_instrument_status_topic(instrument_id)
136}
137
138#[must_use]
139pub fn get_instrument_close_topic(instrument_id: InstrumentId) -> MStr<Topic> {
140    get_message_bus()
141        .borrow_mut()
142        .switchboard
143        .get_instrument_close_topic(instrument_id)
144}
145
146#[must_use]
147pub fn get_order_snapshots_topic(client_order_id: ClientOrderId) -> MStr<Topic> {
148    get_message_bus()
149        .borrow_mut()
150        .switchboard
151        .get_order_snapshots_topic(client_order_id)
152}
153
154#[must_use]
155pub fn get_positions_snapshots_topic(position_id: PositionId) -> MStr<Topic> {
156    get_message_bus()
157        .borrow_mut()
158        .switchboard
159        .get_positions_snapshots_topic(position_id)
160}
161
162#[must_use]
163pub fn get_event_orders_topic(strategy_id: StrategyId) -> MStr<Topic> {
164    get_message_bus()
165        .borrow_mut()
166        .switchboard
167        .get_event_orders_topic(strategy_id)
168}
169
170#[must_use]
171pub fn get_event_positions_topic(strategy_id: StrategyId) -> MStr<Topic> {
172    get_message_bus()
173        .borrow_mut()
174        .switchboard
175        .get_event_positions_topic(strategy_id)
176}
177
178#[cfg(feature = "defi")]
179#[must_use]
180pub fn get_defi_blocks_topic(chain: Blockchain) -> MStr<Topic> {
181    get_message_bus()
182        .borrow_mut()
183        .switchboard
184        .get_defi_blocks_topic(chain)
185}
186
187#[cfg(feature = "defi")]
188#[must_use]
189pub fn get_defi_pool_topic(instrument_id: InstrumentId) -> MStr<Topic> {
190    get_message_bus()
191        .borrow_mut()
192        .switchboard
193        .get_defi_pool_topic(instrument_id)
194}
195
196#[cfg(feature = "defi")]
197#[must_use]
198pub fn get_defi_pool_swaps_topic(instrument_id: InstrumentId) -> MStr<Topic> {
199    get_message_bus()
200        .borrow_mut()
201        .switchboard
202        .get_defi_pool_swaps_topic(instrument_id)
203}
204
205#[cfg(feature = "defi")]
206#[must_use]
207pub fn get_defi_liquidity_topic(instrument_id: InstrumentId) -> MStr<Topic> {
208    get_message_bus()
209        .borrow_mut()
210        .switchboard
211        .get_defi_pool_liquidity_topic(instrument_id)
212}
213
214/// Represents a switchboard of built-in messaging endpoint names.
215#[derive(Clone, Debug)]
216pub struct MessagingSwitchboard {
217    custom_topics: AHashMap<DataType, MStr<Topic>>,
218    instruments_topics: AHashMap<Venue, MStr<Topic>>,
219    instrument_topics: AHashMap<InstrumentId, MStr<Topic>>,
220    book_deltas_topics: AHashMap<InstrumentId, MStr<Topic>>,
221    book_depth10_topics: AHashMap<InstrumentId, MStr<Topic>>,
222    book_snapshots_topics: AHashMap<InstrumentId, MStr<Topic>>,
223    quote_topics: AHashMap<InstrumentId, MStr<Topic>>,
224    trade_topics: AHashMap<InstrumentId, MStr<Topic>>,
225    bar_topics: AHashMap<BarType, MStr<Topic>>,
226    mark_price_topics: AHashMap<InstrumentId, MStr<Topic>>,
227    index_price_topics: AHashMap<InstrumentId, MStr<Topic>>,
228    funding_rate_topics: AHashMap<InstrumentId, MStr<Topic>>,
229    instrument_status_topics: AHashMap<InstrumentId, MStr<Topic>>,
230    instrument_close_topics: AHashMap<InstrumentId, MStr<Topic>>,
231    event_orders_topics: AHashMap<StrategyId, MStr<Topic>>,
232    event_positions_topics: AHashMap<StrategyId, MStr<Topic>>,
233    order_snapshots_topics: AHashMap<ClientOrderId, MStr<Topic>>,
234    positions_snapshots_topics: AHashMap<PositionId, MStr<Topic>>,
235    #[cfg(feature = "defi")]
236    defi_block_topics: AHashMap<Blockchain, MStr<Topic>>,
237    #[cfg(feature = "defi")]
238    defi_pool_topics: AHashMap<InstrumentId, MStr<Topic>>,
239    #[cfg(feature = "defi")]
240    defi_pool_swap_topics: AHashMap<InstrumentId, MStr<Topic>>,
241    #[cfg(feature = "defi")]
242    defi_pool_liquidity_topics: AHashMap<InstrumentId, MStr<Topic>>,
243}
244
245impl Default for MessagingSwitchboard {
246    /// Creates a new default [`MessagingSwitchboard`] instance.
247    fn default() -> Self {
248        Self {
249            custom_topics: AHashMap::new(),
250            instruments_topics: AHashMap::new(),
251            instrument_topics: AHashMap::new(),
252            book_deltas_topics: AHashMap::new(),
253            book_snapshots_topics: AHashMap::new(),
254            book_depth10_topics: AHashMap::new(),
255            quote_topics: AHashMap::new(),
256            trade_topics: AHashMap::new(),
257            mark_price_topics: AHashMap::new(),
258            index_price_topics: AHashMap::new(),
259            funding_rate_topics: AHashMap::new(),
260            bar_topics: AHashMap::new(),
261            instrument_status_topics: AHashMap::new(),
262            instrument_close_topics: AHashMap::new(),
263            order_snapshots_topics: AHashMap::new(),
264            event_orders_topics: AHashMap::new(),
265            event_positions_topics: AHashMap::new(),
266            positions_snapshots_topics: AHashMap::new(),
267            #[cfg(feature = "defi")]
268            defi_block_topics: AHashMap::new(),
269            #[cfg(feature = "defi")]
270            defi_pool_topics: AHashMap::new(),
271            #[cfg(feature = "defi")]
272            defi_pool_swap_topics: AHashMap::new(),
273            #[cfg(feature = "defi")]
274            defi_pool_liquidity_topics: AHashMap::new(),
275        }
276    }
277}
278
279impl MessagingSwitchboard {
280    #[must_use]
281    pub fn data_engine_queue_execute() -> MStr<Endpoint> {
282        "DataEngine.queue_execute".into()
283    }
284
285    #[must_use]
286    pub fn data_engine_execute() -> MStr<Endpoint> {
287        "DataEngine.execute".into()
288    }
289
290    #[must_use]
291    pub fn data_engine_process() -> MStr<Endpoint> {
292        "DataEngine.process".into()
293    }
294
295    #[must_use]
296    pub fn data_engine_response() -> MStr<Endpoint> {
297        "DataEngine.response".into()
298    }
299
300    #[must_use]
301    pub fn exec_engine_execute() -> MStr<Endpoint> {
302        "ExecEngine.execute".into()
303    }
304
305    #[must_use]
306    pub fn exec_engine_process() -> MStr<Endpoint> {
307        "ExecEngine.process".into()
308    }
309
310    #[must_use]
311    pub fn get_custom_topic(&mut self, data_type: &DataType) -> MStr<Topic> {
312        *self
313            .custom_topics
314            .entry(data_type.clone())
315            .or_insert_with(|| format!("data.{}", data_type.topic()).into())
316    }
317
318    #[must_use]
319    pub fn get_instruments_topic(&mut self, venue: Venue) -> MStr<Topic> {
320        *self
321            .instruments_topics
322            .entry(venue)
323            .or_insert_with(|| format!("data.instrument.{venue}").into())
324    }
325
326    #[must_use]
327    pub fn get_instrument_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
328        *self
329            .instrument_topics
330            .entry(instrument_id)
331            .or_insert_with(|| {
332                format!(
333                    "data.instrument.{}.{}",
334                    instrument_id.venue, instrument_id.symbol
335                )
336                .into()
337            })
338    }
339
340    #[must_use]
341    pub fn get_book_deltas_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
342        *self
343            .book_deltas_topics
344            .entry(instrument_id)
345            .or_insert_with(|| {
346                format!(
347                    "data.book.deltas.{}.{}",
348                    instrument_id.venue, instrument_id.symbol
349                )
350                .into()
351            })
352    }
353
354    #[must_use]
355    pub fn get_book_depth10_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
356        *self
357            .book_depth10_topics
358            .entry(instrument_id)
359            .or_insert_with(|| {
360                format!(
361                    "data.book.depth10.{}.{}",
362                    instrument_id.venue, instrument_id.symbol
363                )
364                .into()
365            })
366    }
367
368    #[must_use]
369    pub fn get_book_snapshots_topic(
370        &mut self,
371        instrument_id: InstrumentId,
372        interval_ms: NonZeroUsize,
373    ) -> MStr<Topic> {
374        *self
375            .book_snapshots_topics
376            .entry(instrument_id)
377            .or_insert_with(|| {
378                format!(
379                    "data.book.snapshots.{}.{}.{}",
380                    instrument_id.venue, instrument_id.symbol, interval_ms
381                )
382                .into()
383            })
384    }
385
386    #[must_use]
387    pub fn get_quotes_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
388        *self.quote_topics.entry(instrument_id).or_insert_with(|| {
389            format!(
390                "data.quotes.{}.{}",
391                instrument_id.venue, instrument_id.symbol
392            )
393            .into()
394        })
395    }
396
397    #[must_use]
398    pub fn get_trades_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
399        *self.trade_topics.entry(instrument_id).or_insert_with(|| {
400            format!(
401                "data.trades.{}.{}",
402                instrument_id.venue, instrument_id.symbol
403            )
404            .into()
405        })
406    }
407
408    #[must_use]
409    pub fn get_bars_topic(&mut self, bar_type: BarType) -> MStr<Topic> {
410        *self
411            .bar_topics
412            .entry(bar_type)
413            .or_insert_with(|| format!("data.bars.{bar_type}").into())
414    }
415
416    #[must_use]
417    pub fn get_mark_price_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
418        *self
419            .mark_price_topics
420            .entry(instrument_id)
421            .or_insert_with(|| {
422                format!(
423                    "data.mark_prices.{}.{}",
424                    instrument_id.venue, instrument_id.symbol
425                )
426                .into()
427            })
428    }
429
430    #[must_use]
431    pub fn get_index_price_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
432        *self
433            .index_price_topics
434            .entry(instrument_id)
435            .or_insert_with(|| {
436                format!(
437                    "data.index_prices.{}.{}",
438                    instrument_id.venue, instrument_id.symbol
439                )
440                .into()
441            })
442    }
443
444    pub fn get_funding_rate_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
445        *self
446            .funding_rate_topics
447            .entry(instrument_id)
448            .or_insert_with(|| {
449                format!(
450                    "data.funding_rates.{}.{}",
451                    instrument_id.venue, instrument_id.symbol
452                )
453                .into()
454            })
455    }
456
457    #[must_use]
458    pub fn get_instrument_status_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
459        *self
460            .instrument_status_topics
461            .entry(instrument_id)
462            .or_insert_with(|| {
463                format!(
464                    "data.status.{}.{}",
465                    instrument_id.venue, instrument_id.symbol
466                )
467                .into()
468            })
469    }
470
471    #[must_use]
472    pub fn get_instrument_close_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
473        *self
474            .instrument_close_topics
475            .entry(instrument_id)
476            .or_insert_with(|| {
477                format!(
478                    "data.close.{}.{}",
479                    instrument_id.venue, instrument_id.symbol
480                )
481                .into()
482            })
483    }
484
485    #[must_use]
486    pub fn get_order_snapshots_topic(&mut self, client_order_id: ClientOrderId) -> MStr<Topic> {
487        *self
488            .order_snapshots_topics
489            .entry(client_order_id)
490            .or_insert_with(|| format!("order.snapshots.{client_order_id}").into())
491    }
492
493    #[must_use]
494    pub fn get_positions_snapshots_topic(&mut self, position_id: PositionId) -> MStr<Topic> {
495        *self
496            .positions_snapshots_topics
497            .entry(position_id)
498            .or_insert_with(|| format!("positions.snapshots.{position_id}").into())
499    }
500
501    #[must_use]
502    pub fn get_event_orders_topic(&mut self, strategy_id: StrategyId) -> MStr<Topic> {
503        *self
504            .event_orders_topics
505            .entry(strategy_id)
506            .or_insert_with(|| format!("events.order.{strategy_id}").into())
507    }
508
509    #[must_use]
510    pub fn get_event_positions_topic(&mut self, strategy_id: StrategyId) -> MStr<Topic> {
511        *self
512            .event_positions_topics
513            .entry(strategy_id)
514            .or_insert_with(|| format!("events.position.{strategy_id}").into())
515    }
516
517    #[cfg(feature = "defi")]
518    #[must_use]
519    pub fn get_defi_blocks_topic(&mut self, chain: Blockchain) -> MStr<Topic> {
520        *self
521            .defi_block_topics
522            .entry(chain)
523            .or_insert_with(|| format!("data.defi.blocks.{chain}").into())
524    }
525
526    #[cfg(feature = "defi")]
527    #[must_use]
528    pub fn get_defi_pool_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
529        *self
530            .defi_pool_topics
531            .entry(instrument_id)
532            .or_insert_with(|| format!("data.defi.pool.{instrument_id}").into())
533    }
534
535    #[cfg(feature = "defi")]
536    #[must_use]
537    pub fn get_defi_pool_swaps_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
538        *self
539            .defi_pool_swap_topics
540            .entry(instrument_id)
541            .or_insert_with(|| format!("data.defi.pool_swaps.{instrument_id}").into())
542    }
543
544    #[cfg(feature = "defi")]
545    #[must_use]
546    pub fn get_defi_pool_liquidity_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
547        *self
548            .defi_pool_liquidity_topics
549            .entry(instrument_id)
550            .or_insert_with(|| format!("data.defi.pool_liquidity.{instrument_id}").into())
551    }
552}
553
554////////////////////////////////////////////////////////////////////////////////
555// Tests
556////////////////////////////////////////////////////////////////////////////////
557#[cfg(test)]
558mod tests {
559    use nautilus_model::{
560        data::{BarType, DataType},
561        identifiers::InstrumentId,
562    };
563    use rstest::*;
564
565    use super::*;
566
567    #[fixture]
568    fn switchboard() -> MessagingSwitchboard {
569        MessagingSwitchboard::default()
570    }
571
572    #[fixture]
573    fn instrument_id() -> InstrumentId {
574        InstrumentId::from("ESZ24.XCME")
575    }
576
577    #[rstest]
578    fn test_get_custom_topic(mut switchboard: MessagingSwitchboard) {
579        let data_type = DataType::new("ExampleDataType", None);
580        let expected_topic = "data.ExampleDataType".into();
581        let result = switchboard.get_custom_topic(&data_type);
582        assert_eq!(result, expected_topic);
583        assert!(switchboard.custom_topics.contains_key(&data_type));
584    }
585
586    #[rstest]
587    fn test_get_instrument_topic(
588        mut switchboard: MessagingSwitchboard,
589        instrument_id: InstrumentId,
590    ) {
591        let expected_topic = "data.instrument.XCME.ESZ24".into();
592        let result = switchboard.get_instrument_topic(instrument_id);
593        assert_eq!(result, expected_topic);
594        assert!(switchboard.instrument_topics.contains_key(&instrument_id));
595    }
596
597    #[rstest]
598    fn test_get_book_deltas_topic(
599        mut switchboard: MessagingSwitchboard,
600        instrument_id: InstrumentId,
601    ) {
602        let expected_topic = "data.book.deltas.XCME.ESZ24".into();
603        let result = switchboard.get_book_deltas_topic(instrument_id);
604        assert_eq!(result, expected_topic);
605        assert!(switchboard.book_deltas_topics.contains_key(&instrument_id));
606    }
607
608    #[rstest]
609    fn test_get_book_depth10_topic(
610        mut switchboard: MessagingSwitchboard,
611        instrument_id: InstrumentId,
612    ) {
613        let expected_topic = "data.book.depth10.XCME.ESZ24".into();
614        let result = switchboard.get_book_depth10_topic(instrument_id);
615        assert_eq!(result, expected_topic);
616        assert!(switchboard.book_depth10_topics.contains_key(&instrument_id));
617    }
618
619    #[rstest]
620    fn test_get_book_snapshots_topic(
621        mut switchboard: MessagingSwitchboard,
622        instrument_id: InstrumentId,
623    ) {
624        let expected_topic = "data.book.snapshots.XCME.ESZ24.1000".into();
625        let interval_ms = NonZeroUsize::new(1000).unwrap();
626        let result = switchboard.get_book_snapshots_topic(instrument_id, interval_ms);
627        assert_eq!(result, expected_topic);
628        assert!(
629            switchboard
630                .book_snapshots_topics
631                .contains_key(&instrument_id)
632        );
633    }
634
635    #[rstest]
636    fn test_get_quotes_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
637        let expected_topic = "data.quotes.XCME.ESZ24".into();
638        let result = switchboard.get_quotes_topic(instrument_id);
639        assert_eq!(result, expected_topic);
640        assert!(switchboard.quote_topics.contains_key(&instrument_id));
641    }
642
643    #[rstest]
644    fn test_get_trades_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
645        let expected_topic = "data.trades.XCME.ESZ24".into();
646        let result = switchboard.get_trades_topic(instrument_id);
647        assert_eq!(result, expected_topic);
648        assert!(switchboard.trade_topics.contains_key(&instrument_id));
649    }
650
651    #[rstest]
652    fn test_get_bars_topic(mut switchboard: MessagingSwitchboard) {
653        let bar_type = BarType::from("ESZ24.XCME-1-MINUTE-LAST-INTERNAL");
654        let expected_topic = format!("data.bars.{bar_type}").into();
655        let result = switchboard.get_bars_topic(bar_type);
656        assert_eq!(result, expected_topic);
657        assert!(switchboard.bar_topics.contains_key(&bar_type));
658    }
659
660    #[rstest]
661    fn test_get_order_snapshots_topic(mut switchboard: MessagingSwitchboard) {
662        let client_order_id = ClientOrderId::from("O-123456789");
663        let expected_topic = format!("order.snapshots.{client_order_id}").into();
664        let result = switchboard.get_order_snapshots_topic(client_order_id);
665        assert_eq!(result, expected_topic);
666        assert!(
667            switchboard
668                .order_snapshots_topics
669                .contains_key(&client_order_id)
670        );
671    }
672}