nautilus_common/msgbus/
switchboard.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::num::NonZeroUsize;
17
18use ahash::AHashMap;
19#[cfg(feature = "defi")]
20use nautilus_model::defi::Blockchain;
21use nautilus_model::{
22    data::{BarType, DataType},
23    identifiers::{ClientOrderId, InstrumentId, PositionId, StrategyId, Venue},
24};
25
26use super::core::{Endpoint, MStr, Topic};
27use crate::msgbus::get_message_bus;
28
29pub const CLOSE_TOPIC: &str = "CLOSE";
30
31#[must_use]
32pub fn get_custom_topic(data_type: &DataType) -> MStr<Topic> {
33    get_message_bus()
34        .borrow_mut()
35        .switchboard
36        .get_custom_topic(data_type)
37}
38
39#[must_use]
40pub fn get_instruments_topic(venue: Venue) -> MStr<Topic> {
41    get_message_bus()
42        .borrow_mut()
43        .switchboard
44        .get_instruments_topic(venue)
45}
46
47#[must_use]
48pub fn get_instrument_topic(instrument_id: InstrumentId) -> MStr<Topic> {
49    get_message_bus()
50        .borrow_mut()
51        .switchboard
52        .get_instrument_topic(instrument_id)
53}
54
55#[must_use]
56pub fn get_book_deltas_topic(instrument_id: InstrumentId) -> MStr<Topic> {
57    get_message_bus()
58        .borrow_mut()
59        .switchboard
60        .get_book_deltas_topic(instrument_id)
61}
62
63#[must_use]
64pub fn get_book_depth10_topic(instrument_id: InstrumentId) -> MStr<Topic> {
65    get_message_bus()
66        .borrow_mut()
67        .switchboard
68        .get_book_depth10_topic(instrument_id)
69}
70
71#[must_use]
72pub fn get_book_snapshots_topic(
73    instrument_id: InstrumentId,
74    interval_ms: NonZeroUsize,
75) -> MStr<Topic> {
76    get_message_bus()
77        .borrow_mut()
78        .switchboard
79        .get_book_snapshots_topic(instrument_id, interval_ms)
80}
81
82#[must_use]
83pub fn get_quotes_topic(instrument_id: InstrumentId) -> MStr<Topic> {
84    get_message_bus()
85        .borrow_mut()
86        .switchboard
87        .get_quotes_topic(instrument_id)
88}
89
90#[must_use]
91pub fn get_trades_topic(instrument_id: InstrumentId) -> MStr<Topic> {
92    get_message_bus()
93        .borrow_mut()
94        .switchboard
95        .get_trades_topic(instrument_id)
96}
97
98#[must_use]
99pub fn get_bars_topic(bar_type: BarType) -> MStr<Topic> {
100    get_message_bus()
101        .borrow_mut()
102        .switchboard
103        .get_bars_topic(bar_type)
104}
105
106#[must_use]
107pub fn get_mark_price_topic(instrument_id: InstrumentId) -> MStr<Topic> {
108    get_message_bus()
109        .borrow_mut()
110        .switchboard
111        .get_mark_price_topic(instrument_id)
112}
113
114#[must_use]
115pub fn get_index_price_topic(instrument_id: InstrumentId) -> MStr<Topic> {
116    get_message_bus()
117        .borrow_mut()
118        .switchboard
119        .get_index_price_topic(instrument_id)
120}
121
122#[must_use]
123pub fn get_funding_rate_topic(instrument_id: InstrumentId) -> MStr<Topic> {
124    get_message_bus()
125        .borrow_mut()
126        .switchboard
127        .get_funding_rate_topic(instrument_id)
128}
129
130#[must_use]
131pub fn get_instrument_status_topic(instrument_id: InstrumentId) -> MStr<Topic> {
132    get_message_bus()
133        .borrow_mut()
134        .switchboard
135        .get_instrument_status_topic(instrument_id)
136}
137
138#[must_use]
139pub fn get_instrument_close_topic(instrument_id: InstrumentId) -> MStr<Topic> {
140    get_message_bus()
141        .borrow_mut()
142        .switchboard
143        .get_instrument_close_topic(instrument_id)
144}
145
146#[must_use]
147pub fn get_order_snapshots_topic(client_order_id: ClientOrderId) -> MStr<Topic> {
148    get_message_bus()
149        .borrow_mut()
150        .switchboard
151        .get_order_snapshots_topic(client_order_id)
152}
153
154#[must_use]
155pub fn get_positions_snapshots_topic(position_id: PositionId) -> MStr<Topic> {
156    get_message_bus()
157        .borrow_mut()
158        .switchboard
159        .get_positions_snapshots_topic(position_id)
160}
161
162#[must_use]
163pub fn get_event_orders_topic(strategy_id: StrategyId) -> MStr<Topic> {
164    get_message_bus()
165        .borrow_mut()
166        .switchboard
167        .get_event_orders_topic(strategy_id)
168}
169
170#[must_use]
171pub fn get_event_positions_topic(strategy_id: StrategyId) -> MStr<Topic> {
172    get_message_bus()
173        .borrow_mut()
174        .switchboard
175        .get_event_positions_topic(strategy_id)
176}
177
178#[cfg(feature = "defi")]
179#[must_use]
180pub fn get_defi_blocks_topic(chain: Blockchain) -> MStr<Topic> {
181    get_message_bus()
182        .borrow_mut()
183        .switchboard
184        .get_defi_blocks_topic(chain)
185}
186
187#[cfg(feature = "defi")]
188#[must_use]
189pub fn get_defi_pool_topic(instrument_id: InstrumentId) -> MStr<Topic> {
190    get_message_bus()
191        .borrow_mut()
192        .switchboard
193        .get_defi_pool_topic(instrument_id)
194}
195
196#[cfg(feature = "defi")]
197#[must_use]
198pub fn get_defi_pool_swaps_topic(instrument_id: InstrumentId) -> MStr<Topic> {
199    get_message_bus()
200        .borrow_mut()
201        .switchboard
202        .get_defi_pool_swaps_topic(instrument_id)
203}
204
205#[cfg(feature = "defi")]
206#[must_use]
207pub fn get_defi_liquidity_topic(instrument_id: InstrumentId) -> MStr<Topic> {
208    get_message_bus()
209        .borrow_mut()
210        .switchboard
211        .get_defi_pool_liquidity_topic(instrument_id)
212}
213
214#[cfg(feature = "defi")]
215#[must_use]
216pub fn get_defi_collect_topic(instrument_id: InstrumentId) -> MStr<Topic> {
217    get_message_bus()
218        .borrow_mut()
219        .switchboard
220        .get_defi_pool_liquidity_topic(instrument_id)
221}
222
223/// Represents a switchboard of built-in messaging endpoint names.
224#[derive(Clone, Debug)]
225pub struct MessagingSwitchboard {
226    custom_topics: AHashMap<DataType, MStr<Topic>>,
227    instruments_topics: AHashMap<Venue, MStr<Topic>>,
228    instrument_topics: AHashMap<InstrumentId, MStr<Topic>>,
229    book_deltas_topics: AHashMap<InstrumentId, MStr<Topic>>,
230    book_depth10_topics: AHashMap<InstrumentId, MStr<Topic>>,
231    book_snapshots_topics: AHashMap<InstrumentId, MStr<Topic>>,
232    quote_topics: AHashMap<InstrumentId, MStr<Topic>>,
233    trade_topics: AHashMap<InstrumentId, MStr<Topic>>,
234    bar_topics: AHashMap<BarType, MStr<Topic>>,
235    mark_price_topics: AHashMap<InstrumentId, MStr<Topic>>,
236    index_price_topics: AHashMap<InstrumentId, MStr<Topic>>,
237    funding_rate_topics: AHashMap<InstrumentId, MStr<Topic>>,
238    instrument_status_topics: AHashMap<InstrumentId, MStr<Topic>>,
239    instrument_close_topics: AHashMap<InstrumentId, MStr<Topic>>,
240    event_orders_topics: AHashMap<StrategyId, MStr<Topic>>,
241    event_positions_topics: AHashMap<StrategyId, MStr<Topic>>,
242    order_snapshots_topics: AHashMap<ClientOrderId, MStr<Topic>>,
243    positions_snapshots_topics: AHashMap<PositionId, MStr<Topic>>,
244    #[cfg(feature = "defi")]
245    defi_block_topics: AHashMap<Blockchain, MStr<Topic>>,
246    #[cfg(feature = "defi")]
247    defi_pool_topics: AHashMap<InstrumentId, MStr<Topic>>,
248    #[cfg(feature = "defi")]
249    defi_pool_swap_topics: AHashMap<InstrumentId, MStr<Topic>>,
250    #[cfg(feature = "defi")]
251    defi_pool_liquidity_topics: AHashMap<InstrumentId, MStr<Topic>>,
252    #[cfg(feature = "defi")]
253    defi_pool_collect_topics: AHashMap<InstrumentId, MStr<Topic>>,
254}
255
256impl Default for MessagingSwitchboard {
257    /// Creates a new default [`MessagingSwitchboard`] instance.
258    fn default() -> Self {
259        Self {
260            custom_topics: AHashMap::new(),
261            instruments_topics: AHashMap::new(),
262            instrument_topics: AHashMap::new(),
263            book_deltas_topics: AHashMap::new(),
264            book_snapshots_topics: AHashMap::new(),
265            book_depth10_topics: AHashMap::new(),
266            quote_topics: AHashMap::new(),
267            trade_topics: AHashMap::new(),
268            mark_price_topics: AHashMap::new(),
269            index_price_topics: AHashMap::new(),
270            funding_rate_topics: AHashMap::new(),
271            bar_topics: AHashMap::new(),
272            instrument_status_topics: AHashMap::new(),
273            instrument_close_topics: AHashMap::new(),
274            order_snapshots_topics: AHashMap::new(),
275            event_orders_topics: AHashMap::new(),
276            event_positions_topics: AHashMap::new(),
277            positions_snapshots_topics: AHashMap::new(),
278            #[cfg(feature = "defi")]
279            defi_block_topics: AHashMap::new(),
280            #[cfg(feature = "defi")]
281            defi_pool_topics: AHashMap::new(),
282            #[cfg(feature = "defi")]
283            defi_pool_swap_topics: AHashMap::new(),
284            #[cfg(feature = "defi")]
285            defi_pool_liquidity_topics: AHashMap::new(),
286            #[cfg(feature = "defi")]
287            defi_pool_collect_topics: AHashMap::new(),
288        }
289    }
290}
291
292impl MessagingSwitchboard {
293    #[must_use]
294    pub fn data_engine_queue_execute() -> MStr<Endpoint> {
295        "DataEngine.queue_execute".into()
296    }
297
298    #[must_use]
299    pub fn data_engine_execute() -> MStr<Endpoint> {
300        "DataEngine.execute".into()
301    }
302
303    #[must_use]
304    pub fn data_engine_process() -> MStr<Endpoint> {
305        "DataEngine.process".into()
306    }
307
308    #[must_use]
309    pub fn data_engine_response() -> MStr<Endpoint> {
310        "DataEngine.response".into()
311    }
312
313    #[must_use]
314    pub fn exec_engine_execute() -> MStr<Endpoint> {
315        "ExecEngine.execute".into()
316    }
317
318    #[must_use]
319    pub fn exec_engine_process() -> MStr<Endpoint> {
320        "ExecEngine.process".into()
321    }
322
323    #[must_use]
324    pub fn get_custom_topic(&mut self, data_type: &DataType) -> MStr<Topic> {
325        *self
326            .custom_topics
327            .entry(data_type.clone())
328            .or_insert_with(|| format!("data.{}", data_type.topic()).into())
329    }
330
331    #[must_use]
332    pub fn get_instruments_topic(&mut self, venue: Venue) -> MStr<Topic> {
333        *self
334            .instruments_topics
335            .entry(venue)
336            .or_insert_with(|| format!("data.instrument.{venue}").into())
337    }
338
339    #[must_use]
340    pub fn get_instrument_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
341        *self
342            .instrument_topics
343            .entry(instrument_id)
344            .or_insert_with(|| {
345                format!(
346                    "data.instrument.{}.{}",
347                    instrument_id.venue, instrument_id.symbol
348                )
349                .into()
350            })
351    }
352
353    #[must_use]
354    pub fn get_book_deltas_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
355        *self
356            .book_deltas_topics
357            .entry(instrument_id)
358            .or_insert_with(|| {
359                format!(
360                    "data.book.deltas.{}.{}",
361                    instrument_id.venue, instrument_id.symbol
362                )
363                .into()
364            })
365    }
366
367    #[must_use]
368    pub fn get_book_depth10_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
369        *self
370            .book_depth10_topics
371            .entry(instrument_id)
372            .or_insert_with(|| {
373                format!(
374                    "data.book.depth10.{}.{}",
375                    instrument_id.venue, instrument_id.symbol
376                )
377                .into()
378            })
379    }
380
381    #[must_use]
382    pub fn get_book_snapshots_topic(
383        &mut self,
384        instrument_id: InstrumentId,
385        interval_ms: NonZeroUsize,
386    ) -> MStr<Topic> {
387        *self
388            .book_snapshots_topics
389            .entry(instrument_id)
390            .or_insert_with(|| {
391                format!(
392                    "data.book.snapshots.{}.{}.{}",
393                    instrument_id.venue, instrument_id.symbol, interval_ms
394                )
395                .into()
396            })
397    }
398
399    #[must_use]
400    pub fn get_quotes_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
401        *self.quote_topics.entry(instrument_id).or_insert_with(|| {
402            format!(
403                "data.quotes.{}.{}",
404                instrument_id.venue, instrument_id.symbol
405            )
406            .into()
407        })
408    }
409
410    #[must_use]
411    pub fn get_trades_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
412        *self.trade_topics.entry(instrument_id).or_insert_with(|| {
413            format!(
414                "data.trades.{}.{}",
415                instrument_id.venue, instrument_id.symbol
416            )
417            .into()
418        })
419    }
420
421    #[must_use]
422    pub fn get_bars_topic(&mut self, bar_type: BarType) -> MStr<Topic> {
423        *self
424            .bar_topics
425            .entry(bar_type)
426            .or_insert_with(|| format!("data.bars.{bar_type}").into())
427    }
428
429    #[must_use]
430    pub fn get_mark_price_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
431        *self
432            .mark_price_topics
433            .entry(instrument_id)
434            .or_insert_with(|| {
435                format!(
436                    "data.mark_prices.{}.{}",
437                    instrument_id.venue, instrument_id.symbol
438                )
439                .into()
440            })
441    }
442
443    #[must_use]
444    pub fn get_index_price_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
445        *self
446            .index_price_topics
447            .entry(instrument_id)
448            .or_insert_with(|| {
449                format!(
450                    "data.index_prices.{}.{}",
451                    instrument_id.venue, instrument_id.symbol
452                )
453                .into()
454            })
455    }
456
457    pub fn get_funding_rate_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
458        *self
459            .funding_rate_topics
460            .entry(instrument_id)
461            .or_insert_with(|| {
462                format!(
463                    "data.funding_rates.{}.{}",
464                    instrument_id.venue, instrument_id.symbol
465                )
466                .into()
467            })
468    }
469
470    #[must_use]
471    pub fn get_instrument_status_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
472        *self
473            .instrument_status_topics
474            .entry(instrument_id)
475            .or_insert_with(|| {
476                format!(
477                    "data.status.{}.{}",
478                    instrument_id.venue, instrument_id.symbol
479                )
480                .into()
481            })
482    }
483
484    #[must_use]
485    pub fn get_instrument_close_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
486        *self
487            .instrument_close_topics
488            .entry(instrument_id)
489            .or_insert_with(|| {
490                format!(
491                    "data.close.{}.{}",
492                    instrument_id.venue, instrument_id.symbol
493                )
494                .into()
495            })
496    }
497
498    #[must_use]
499    pub fn get_order_snapshots_topic(&mut self, client_order_id: ClientOrderId) -> MStr<Topic> {
500        *self
501            .order_snapshots_topics
502            .entry(client_order_id)
503            .or_insert_with(|| format!("order.snapshots.{client_order_id}").into())
504    }
505
506    #[must_use]
507    pub fn get_positions_snapshots_topic(&mut self, position_id: PositionId) -> MStr<Topic> {
508        *self
509            .positions_snapshots_topics
510            .entry(position_id)
511            .or_insert_with(|| format!("positions.snapshots.{position_id}").into())
512    }
513
514    #[must_use]
515    pub fn get_event_orders_topic(&mut self, strategy_id: StrategyId) -> MStr<Topic> {
516        *self
517            .event_orders_topics
518            .entry(strategy_id)
519            .or_insert_with(|| format!("events.order.{strategy_id}").into())
520    }
521
522    #[must_use]
523    pub fn get_event_positions_topic(&mut self, strategy_id: StrategyId) -> MStr<Topic> {
524        *self
525            .event_positions_topics
526            .entry(strategy_id)
527            .or_insert_with(|| format!("events.position.{strategy_id}").into())
528    }
529
530    #[cfg(feature = "defi")]
531    #[must_use]
532    pub fn get_defi_blocks_topic(&mut self, chain: Blockchain) -> MStr<Topic> {
533        *self
534            .defi_block_topics
535            .entry(chain)
536            .or_insert_with(|| format!("data.defi.blocks.{chain}").into())
537    }
538
539    #[cfg(feature = "defi")]
540    #[must_use]
541    pub fn get_defi_pool_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
542        *self
543            .defi_pool_topics
544            .entry(instrument_id)
545            .or_insert_with(|| format!("data.defi.pool.{instrument_id}").into())
546    }
547
548    #[cfg(feature = "defi")]
549    #[must_use]
550    pub fn get_defi_pool_swaps_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
551        *self
552            .defi_pool_swap_topics
553            .entry(instrument_id)
554            .or_insert_with(|| format!("data.defi.pool_swaps.{instrument_id}").into())
555    }
556
557    #[cfg(feature = "defi")]
558    #[must_use]
559    pub fn get_defi_pool_liquidity_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
560        *self
561            .defi_pool_liquidity_topics
562            .entry(instrument_id)
563            .or_insert_with(|| format!("data.defi.pool_liquidity.{instrument_id}").into())
564    }
565
566    #[cfg(feature = "defi")]
567    #[must_use]
568    pub fn get_defi_pool_collect_topics(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
569        *self
570            .defi_pool_collect_topics
571            .entry(instrument_id)
572            .or_insert_with(|| format!("data.defi.pool_collect.{instrument_id}").into())
573    }
574}
575
576////////////////////////////////////////////////////////////////////////////////
577// Tests
578////////////////////////////////////////////////////////////////////////////////
579#[cfg(test)]
580mod tests {
581    use nautilus_model::{
582        data::{BarType, DataType},
583        identifiers::InstrumentId,
584    };
585    use rstest::*;
586
587    use super::*;
588
589    #[fixture]
590    fn switchboard() -> MessagingSwitchboard {
591        MessagingSwitchboard::default()
592    }
593
594    #[fixture]
595    fn instrument_id() -> InstrumentId {
596        InstrumentId::from("ESZ24.XCME")
597    }
598
599    #[rstest]
600    fn test_get_custom_topic(mut switchboard: MessagingSwitchboard) {
601        let data_type = DataType::new("ExampleDataType", None);
602        let expected_topic = "data.ExampleDataType".into();
603        let result = switchboard.get_custom_topic(&data_type);
604        assert_eq!(result, expected_topic);
605        assert!(switchboard.custom_topics.contains_key(&data_type));
606    }
607
608    #[rstest]
609    fn test_get_instrument_topic(
610        mut switchboard: MessagingSwitchboard,
611        instrument_id: InstrumentId,
612    ) {
613        let expected_topic = "data.instrument.XCME.ESZ24".into();
614        let result = switchboard.get_instrument_topic(instrument_id);
615        assert_eq!(result, expected_topic);
616        assert!(switchboard.instrument_topics.contains_key(&instrument_id));
617    }
618
619    #[rstest]
620    fn test_get_book_deltas_topic(
621        mut switchboard: MessagingSwitchboard,
622        instrument_id: InstrumentId,
623    ) {
624        let expected_topic = "data.book.deltas.XCME.ESZ24".into();
625        let result = switchboard.get_book_deltas_topic(instrument_id);
626        assert_eq!(result, expected_topic);
627        assert!(switchboard.book_deltas_topics.contains_key(&instrument_id));
628    }
629
630    #[rstest]
631    fn test_get_book_depth10_topic(
632        mut switchboard: MessagingSwitchboard,
633        instrument_id: InstrumentId,
634    ) {
635        let expected_topic = "data.book.depth10.XCME.ESZ24".into();
636        let result = switchboard.get_book_depth10_topic(instrument_id);
637        assert_eq!(result, expected_topic);
638        assert!(switchboard.book_depth10_topics.contains_key(&instrument_id));
639    }
640
641    #[rstest]
642    fn test_get_book_snapshots_topic(
643        mut switchboard: MessagingSwitchboard,
644        instrument_id: InstrumentId,
645    ) {
646        let expected_topic = "data.book.snapshots.XCME.ESZ24.1000".into();
647        let interval_ms = NonZeroUsize::new(1000).unwrap();
648        let result = switchboard.get_book_snapshots_topic(instrument_id, interval_ms);
649        assert_eq!(result, expected_topic);
650        assert!(
651            switchboard
652                .book_snapshots_topics
653                .contains_key(&instrument_id)
654        );
655    }
656
657    #[rstest]
658    fn test_get_quotes_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
659        let expected_topic = "data.quotes.XCME.ESZ24".into();
660        let result = switchboard.get_quotes_topic(instrument_id);
661        assert_eq!(result, expected_topic);
662        assert!(switchboard.quote_topics.contains_key(&instrument_id));
663    }
664
665    #[rstest]
666    fn test_get_trades_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
667        let expected_topic = "data.trades.XCME.ESZ24".into();
668        let result = switchboard.get_trades_topic(instrument_id);
669        assert_eq!(result, expected_topic);
670        assert!(switchboard.trade_topics.contains_key(&instrument_id));
671    }
672
673    #[rstest]
674    fn test_get_bars_topic(mut switchboard: MessagingSwitchboard) {
675        let bar_type = BarType::from("ESZ24.XCME-1-MINUTE-LAST-INTERNAL");
676        let expected_topic = format!("data.bars.{bar_type}").into();
677        let result = switchboard.get_bars_topic(bar_type);
678        assert_eq!(result, expected_topic);
679        assert!(switchboard.bar_topics.contains_key(&bar_type));
680    }
681
682    #[rstest]
683    fn test_get_order_snapshots_topic(mut switchboard: MessagingSwitchboard) {
684        let client_order_id = ClientOrderId::from("O-123456789");
685        let expected_topic = format!("order.snapshots.{client_order_id}").into();
686        let result = switchboard.get_order_snapshots_topic(client_order_id);
687        assert_eq!(result, expected_topic);
688        assert!(
689            switchboard
690                .order_snapshots_topics
691                .contains_key(&client_order_id)
692        );
693    }
694}