nautilus_common/msgbus/
switchboard.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::num::NonZeroUsize;
17
18use ahash::AHashMap;
19use nautilus_model::{
20    data::{BarType, DataType},
21    identifiers::{ClientOrderId, InstrumentId, PositionId, StrategyId, Venue},
22};
23
24use super::core::{Endpoint, MStr, Topic};
25use crate::msgbus::get_message_bus;
26
27pub const CLOSE_TOPIC: &str = "CLOSE";
28
29#[must_use]
30pub fn get_custom_topic(data_type: &DataType) -> MStr<Topic> {
31    get_message_bus()
32        .borrow_mut()
33        .switchboard
34        .get_custom_topic(data_type)
35}
36
37#[must_use]
38pub fn get_instruments_topic(venue: Venue) -> MStr<Topic> {
39    get_message_bus()
40        .borrow_mut()
41        .switchboard
42        .get_instruments_topic(venue)
43}
44
45#[must_use]
46pub fn get_instrument_topic(instrument_id: InstrumentId) -> MStr<Topic> {
47    get_message_bus()
48        .borrow_mut()
49        .switchboard
50        .get_instrument_topic(instrument_id)
51}
52
53#[must_use]
54pub fn get_book_deltas_topic(instrument_id: InstrumentId) -> MStr<Topic> {
55    get_message_bus()
56        .borrow_mut()
57        .switchboard
58        .get_book_deltas_topic(instrument_id)
59}
60
61#[must_use]
62pub fn get_book_depth10_topic(instrument_id: InstrumentId) -> MStr<Topic> {
63    get_message_bus()
64        .borrow_mut()
65        .switchboard
66        .get_book_depth10_topic(instrument_id)
67}
68
69#[must_use]
70pub fn get_book_snapshots_topic(
71    instrument_id: InstrumentId,
72    interval_ms: NonZeroUsize,
73) -> MStr<Topic> {
74    get_message_bus()
75        .borrow_mut()
76        .switchboard
77        .get_book_snapshots_topic(instrument_id, interval_ms)
78}
79
80#[must_use]
81pub fn get_quotes_topic(instrument_id: InstrumentId) -> MStr<Topic> {
82    get_message_bus()
83        .borrow_mut()
84        .switchboard
85        .get_quotes_topic(instrument_id)
86}
87
88#[must_use]
89pub fn get_trades_topic(instrument_id: InstrumentId) -> MStr<Topic> {
90    get_message_bus()
91        .borrow_mut()
92        .switchboard
93        .get_trades_topic(instrument_id)
94}
95
96#[must_use]
97pub fn get_bars_topic(bar_type: BarType) -> MStr<Topic> {
98    get_message_bus()
99        .borrow_mut()
100        .switchboard
101        .get_bars_topic(bar_type)
102}
103
104#[must_use]
105pub fn get_mark_price_topic(instrument_id: InstrumentId) -> MStr<Topic> {
106    get_message_bus()
107        .borrow_mut()
108        .switchboard
109        .get_mark_price_topic(instrument_id)
110}
111
112#[must_use]
113pub fn get_index_price_topic(instrument_id: InstrumentId) -> MStr<Topic> {
114    get_message_bus()
115        .borrow_mut()
116        .switchboard
117        .get_index_price_topic(instrument_id)
118}
119
120#[must_use]
121pub fn get_funding_rate_topic(instrument_id: InstrumentId) -> MStr<Topic> {
122    get_message_bus()
123        .borrow_mut()
124        .switchboard
125        .get_funding_rate_topic(instrument_id)
126}
127
128#[must_use]
129pub fn get_instrument_status_topic(instrument_id: InstrumentId) -> MStr<Topic> {
130    get_message_bus()
131        .borrow_mut()
132        .switchboard
133        .get_instrument_status_topic(instrument_id)
134}
135
136#[must_use]
137pub fn get_instrument_close_topic(instrument_id: InstrumentId) -> MStr<Topic> {
138    get_message_bus()
139        .borrow_mut()
140        .switchboard
141        .get_instrument_close_topic(instrument_id)
142}
143
144#[must_use]
145pub fn get_order_fills_topic(instrument_id: InstrumentId) -> MStr<Topic> {
146    get_message_bus()
147        .borrow_mut()
148        .switchboard
149        .get_order_fills_topic(instrument_id)
150}
151
152#[must_use]
153pub fn get_order_snapshots_topic(client_order_id: ClientOrderId) -> MStr<Topic> {
154    get_message_bus()
155        .borrow_mut()
156        .switchboard
157        .get_order_snapshots_topic(client_order_id)
158}
159
160#[must_use]
161pub fn get_positions_snapshots_topic(position_id: PositionId) -> MStr<Topic> {
162    get_message_bus()
163        .borrow_mut()
164        .switchboard
165        .get_positions_snapshots_topic(position_id)
166}
167
168#[must_use]
169pub fn get_event_orders_topic(strategy_id: StrategyId) -> MStr<Topic> {
170    get_message_bus()
171        .borrow_mut()
172        .switchboard
173        .get_event_orders_topic(strategy_id)
174}
175
176#[must_use]
177pub fn get_event_positions_topic(strategy_id: StrategyId) -> MStr<Topic> {
178    get_message_bus()
179        .borrow_mut()
180        .switchboard
181        .get_event_positions_topic(strategy_id)
182}
183
184/// Represents a switchboard of built-in messaging endpoint names.
185#[derive(Clone, Debug)]
186pub struct MessagingSwitchboard {
187    custom_topics: AHashMap<DataType, MStr<Topic>>,
188    instruments_topics: AHashMap<Venue, MStr<Topic>>,
189    instrument_topics: AHashMap<InstrumentId, MStr<Topic>>,
190    book_deltas_topics: AHashMap<InstrumentId, MStr<Topic>>,
191    book_depth10_topics: AHashMap<InstrumentId, MStr<Topic>>,
192    book_snapshots_topics: AHashMap<InstrumentId, MStr<Topic>>,
193    quote_topics: AHashMap<InstrumentId, MStr<Topic>>,
194    trade_topics: AHashMap<InstrumentId, MStr<Topic>>,
195    bar_topics: AHashMap<BarType, MStr<Topic>>,
196    mark_price_topics: AHashMap<InstrumentId, MStr<Topic>>,
197    index_price_topics: AHashMap<InstrumentId, MStr<Topic>>,
198    funding_rate_topics: AHashMap<InstrumentId, MStr<Topic>>,
199    instrument_status_topics: AHashMap<InstrumentId, MStr<Topic>>,
200    instrument_close_topics: AHashMap<InstrumentId, MStr<Topic>>,
201    order_fills_topics: AHashMap<InstrumentId, MStr<Topic>>,
202    event_orders_topics: AHashMap<StrategyId, MStr<Topic>>,
203    event_positions_topics: AHashMap<StrategyId, MStr<Topic>>,
204    order_snapshots_topics: AHashMap<ClientOrderId, MStr<Topic>>,
205    positions_snapshots_topics: AHashMap<PositionId, MStr<Topic>>,
206    #[cfg(feature = "defi")]
207    pub(crate) defi: crate::defi::switchboard::DefiSwitchboard,
208}
209
210impl Default for MessagingSwitchboard {
211    /// Creates a new default [`MessagingSwitchboard`] instance.
212    fn default() -> Self {
213        Self {
214            custom_topics: AHashMap::new(),
215            instruments_topics: AHashMap::new(),
216            instrument_topics: AHashMap::new(),
217            book_deltas_topics: AHashMap::new(),
218            book_snapshots_topics: AHashMap::new(),
219            book_depth10_topics: AHashMap::new(),
220            quote_topics: AHashMap::new(),
221            trade_topics: AHashMap::new(),
222            mark_price_topics: AHashMap::new(),
223            index_price_topics: AHashMap::new(),
224            funding_rate_topics: AHashMap::new(),
225            bar_topics: AHashMap::new(),
226            instrument_status_topics: AHashMap::new(),
227            instrument_close_topics: AHashMap::new(),
228            order_fills_topics: AHashMap::new(),
229            order_snapshots_topics: AHashMap::new(),
230            event_orders_topics: AHashMap::new(),
231            event_positions_topics: AHashMap::new(),
232            positions_snapshots_topics: AHashMap::new(),
233            #[cfg(feature = "defi")]
234            defi: crate::defi::switchboard::DefiSwitchboard::default(),
235        }
236    }
237}
238
239impl MessagingSwitchboard {
240    #[must_use]
241    pub fn data_engine_queue_execute() -> MStr<Endpoint> {
242        "DataEngine.queue_execute".into()
243    }
244
245    #[must_use]
246    pub fn data_engine_execute() -> MStr<Endpoint> {
247        "DataEngine.execute".into()
248    }
249
250    #[must_use]
251    pub fn data_engine_process() -> MStr<Endpoint> {
252        "DataEngine.process".into()
253    }
254
255    #[must_use]
256    pub fn data_engine_response() -> MStr<Endpoint> {
257        "DataEngine.response".into()
258    }
259
260    #[must_use]
261    pub fn exec_engine_execute() -> MStr<Endpoint> {
262        "ExecEngine.execute".into()
263    }
264
265    #[must_use]
266    pub fn exec_engine_process() -> MStr<Endpoint> {
267        "ExecEngine.process".into()
268    }
269
270    #[must_use]
271    pub fn get_custom_topic(&mut self, data_type: &DataType) -> MStr<Topic> {
272        *self
273            .custom_topics
274            .entry(data_type.clone())
275            .or_insert_with(|| format!("data.{}", data_type.topic()).into())
276    }
277
278    #[must_use]
279    pub fn get_instruments_topic(&mut self, venue: Venue) -> MStr<Topic> {
280        *self
281            .instruments_topics
282            .entry(venue)
283            .or_insert_with(|| format!("data.instrument.{venue}").into())
284    }
285
286    #[must_use]
287    pub fn get_instrument_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
288        *self
289            .instrument_topics
290            .entry(instrument_id)
291            .or_insert_with(|| {
292                format!(
293                    "data.instrument.{}.{}",
294                    instrument_id.venue, instrument_id.symbol
295                )
296                .into()
297            })
298    }
299
300    #[must_use]
301    pub fn get_book_deltas_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
302        *self
303            .book_deltas_topics
304            .entry(instrument_id)
305            .or_insert_with(|| {
306                format!(
307                    "data.book.deltas.{}.{}",
308                    instrument_id.venue, instrument_id.symbol
309                )
310                .into()
311            })
312    }
313
314    #[must_use]
315    pub fn get_book_depth10_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
316        *self
317            .book_depth10_topics
318            .entry(instrument_id)
319            .or_insert_with(|| {
320                format!(
321                    "data.book.depth10.{}.{}",
322                    instrument_id.venue, instrument_id.symbol
323                )
324                .into()
325            })
326    }
327
328    #[must_use]
329    pub fn get_book_snapshots_topic(
330        &mut self,
331        instrument_id: InstrumentId,
332        interval_ms: NonZeroUsize,
333    ) -> MStr<Topic> {
334        *self
335            .book_snapshots_topics
336            .entry(instrument_id)
337            .or_insert_with(|| {
338                format!(
339                    "data.book.snapshots.{}.{}.{}",
340                    instrument_id.venue, instrument_id.symbol, interval_ms
341                )
342                .into()
343            })
344    }
345
346    #[must_use]
347    pub fn get_quotes_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
348        *self.quote_topics.entry(instrument_id).or_insert_with(|| {
349            format!(
350                "data.quotes.{}.{}",
351                instrument_id.venue, instrument_id.symbol
352            )
353            .into()
354        })
355    }
356
357    #[must_use]
358    pub fn get_trades_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
359        *self.trade_topics.entry(instrument_id).or_insert_with(|| {
360            format!(
361                "data.trades.{}.{}",
362                instrument_id.venue, instrument_id.symbol
363            )
364            .into()
365        })
366    }
367
368    #[must_use]
369    pub fn get_bars_topic(&mut self, bar_type: BarType) -> MStr<Topic> {
370        *self
371            .bar_topics
372            .entry(bar_type)
373            .or_insert_with(|| format!("data.bars.{bar_type}").into())
374    }
375
376    #[must_use]
377    pub fn get_mark_price_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
378        *self
379            .mark_price_topics
380            .entry(instrument_id)
381            .or_insert_with(|| {
382                format!(
383                    "data.mark_prices.{}.{}",
384                    instrument_id.venue, instrument_id.symbol
385                )
386                .into()
387            })
388    }
389
390    #[must_use]
391    pub fn get_index_price_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
392        *self
393            .index_price_topics
394            .entry(instrument_id)
395            .or_insert_with(|| {
396                format!(
397                    "data.index_prices.{}.{}",
398                    instrument_id.venue, instrument_id.symbol
399                )
400                .into()
401            })
402    }
403
404    pub fn get_funding_rate_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
405        *self
406            .funding_rate_topics
407            .entry(instrument_id)
408            .or_insert_with(|| {
409                format!(
410                    "data.funding_rates.{}.{}",
411                    instrument_id.venue, instrument_id.symbol
412                )
413                .into()
414            })
415    }
416
417    #[must_use]
418    pub fn get_instrument_status_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
419        *self
420            .instrument_status_topics
421            .entry(instrument_id)
422            .or_insert_with(|| {
423                format!(
424                    "data.status.{}.{}",
425                    instrument_id.venue, instrument_id.symbol
426                )
427                .into()
428            })
429    }
430
431    #[must_use]
432    pub fn get_instrument_close_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
433        *self
434            .instrument_close_topics
435            .entry(instrument_id)
436            .or_insert_with(|| {
437                format!(
438                    "data.close.{}.{}",
439                    instrument_id.venue, instrument_id.symbol
440                )
441                .into()
442            })
443    }
444
445    #[must_use]
446    pub fn get_order_fills_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
447        *self
448            .order_fills_topics
449            .entry(instrument_id)
450            .or_insert_with(|| format!("events.fills.{instrument_id}").into())
451    }
452
453    #[must_use]
454    pub fn get_order_snapshots_topic(&mut self, client_order_id: ClientOrderId) -> MStr<Topic> {
455        *self
456            .order_snapshots_topics
457            .entry(client_order_id)
458            .or_insert_with(|| format!("order.snapshots.{client_order_id}").into())
459    }
460
461    #[must_use]
462    pub fn get_positions_snapshots_topic(&mut self, position_id: PositionId) -> MStr<Topic> {
463        *self
464            .positions_snapshots_topics
465            .entry(position_id)
466            .or_insert_with(|| format!("positions.snapshots.{position_id}").into())
467    }
468
469    #[must_use]
470    pub fn get_event_orders_topic(&mut self, strategy_id: StrategyId) -> MStr<Topic> {
471        *self
472            .event_orders_topics
473            .entry(strategy_id)
474            .or_insert_with(|| format!("events.order.{strategy_id}").into())
475    }
476
477    #[must_use]
478    pub fn get_event_positions_topic(&mut self, strategy_id: StrategyId) -> MStr<Topic> {
479        *self
480            .event_positions_topics
481            .entry(strategy_id)
482            .or_insert_with(|| format!("events.position.{strategy_id}").into())
483    }
484}
485
486////////////////////////////////////////////////////////////////////////////////
487// Tests
488////////////////////////////////////////////////////////////////////////////////
489#[cfg(test)]
490mod tests {
491    use nautilus_model::{
492        data::{BarType, DataType},
493        identifiers::InstrumentId,
494    };
495    use rstest::*;
496
497    use super::*;
498
499    #[fixture]
500    fn switchboard() -> MessagingSwitchboard {
501        MessagingSwitchboard::default()
502    }
503
504    #[fixture]
505    fn instrument_id() -> InstrumentId {
506        InstrumentId::from("ESZ24.XCME")
507    }
508
509    #[rstest]
510    fn test_get_custom_topic(mut switchboard: MessagingSwitchboard) {
511        let data_type = DataType::new("ExampleDataType", None);
512        let expected_topic = "data.ExampleDataType".into();
513        let result = switchboard.get_custom_topic(&data_type);
514        assert_eq!(result, expected_topic);
515        assert!(switchboard.custom_topics.contains_key(&data_type));
516    }
517
518    #[rstest]
519    fn test_get_instrument_topic(
520        mut switchboard: MessagingSwitchboard,
521        instrument_id: InstrumentId,
522    ) {
523        let expected_topic = "data.instrument.XCME.ESZ24".into();
524        let result = switchboard.get_instrument_topic(instrument_id);
525        assert_eq!(result, expected_topic);
526        assert!(switchboard.instrument_topics.contains_key(&instrument_id));
527    }
528
529    #[rstest]
530    fn test_get_book_deltas_topic(
531        mut switchboard: MessagingSwitchboard,
532        instrument_id: InstrumentId,
533    ) {
534        let expected_topic = "data.book.deltas.XCME.ESZ24".into();
535        let result = switchboard.get_book_deltas_topic(instrument_id);
536        assert_eq!(result, expected_topic);
537        assert!(switchboard.book_deltas_topics.contains_key(&instrument_id));
538    }
539
540    #[rstest]
541    fn test_get_book_depth10_topic(
542        mut switchboard: MessagingSwitchboard,
543        instrument_id: InstrumentId,
544    ) {
545        let expected_topic = "data.book.depth10.XCME.ESZ24".into();
546        let result = switchboard.get_book_depth10_topic(instrument_id);
547        assert_eq!(result, expected_topic);
548        assert!(switchboard.book_depth10_topics.contains_key(&instrument_id));
549    }
550
551    #[rstest]
552    fn test_get_book_snapshots_topic(
553        mut switchboard: MessagingSwitchboard,
554        instrument_id: InstrumentId,
555    ) {
556        let expected_topic = "data.book.snapshots.XCME.ESZ24.1000".into();
557        let interval_ms = NonZeroUsize::new(1000).unwrap();
558        let result = switchboard.get_book_snapshots_topic(instrument_id, interval_ms);
559        assert_eq!(result, expected_topic);
560        assert!(
561            switchboard
562                .book_snapshots_topics
563                .contains_key(&instrument_id)
564        );
565    }
566
567    #[rstest]
568    fn test_get_quotes_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
569        let expected_topic = "data.quotes.XCME.ESZ24".into();
570        let result = switchboard.get_quotes_topic(instrument_id);
571        assert_eq!(result, expected_topic);
572        assert!(switchboard.quote_topics.contains_key(&instrument_id));
573    }
574
575    #[rstest]
576    fn test_get_trades_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
577        let expected_topic = "data.trades.XCME.ESZ24".into();
578        let result = switchboard.get_trades_topic(instrument_id);
579        assert_eq!(result, expected_topic);
580        assert!(switchboard.trade_topics.contains_key(&instrument_id));
581    }
582
583    #[rstest]
584    fn test_get_bars_topic(mut switchboard: MessagingSwitchboard) {
585        let bar_type = BarType::from("ESZ24.XCME-1-MINUTE-LAST-INTERNAL");
586        let expected_topic = format!("data.bars.{bar_type}").into();
587        let result = switchboard.get_bars_topic(bar_type);
588        assert_eq!(result, expected_topic);
589        assert!(switchboard.bar_topics.contains_key(&bar_type));
590    }
591
592    #[rstest]
593    fn test_get_order_snapshots_topic(mut switchboard: MessagingSwitchboard) {
594        let client_order_id = ClientOrderId::from("O-123456789");
595        let expected_topic = format!("order.snapshots.{client_order_id}").into();
596        let result = switchboard.get_order_snapshots_topic(client_order_id);
597        assert_eq!(result, expected_topic);
598        assert!(
599            switchboard
600                .order_snapshots_topics
601                .contains_key(&client_order_id)
602        );
603    }
604}