nautilus_common/msgbus/
switchboard.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::collections::HashMap;
17
18use nautilus_model::{
19    data::{BarType, DataType},
20    identifiers::{ClientOrderId, InstrumentId, PositionId, StrategyId, Venue},
21};
22use ustr::Ustr;
23
24use crate::msgbus::get_message_bus;
25
26#[must_use]
27pub fn get_custom_topic(data_type: &DataType) -> Ustr {
28    get_message_bus()
29        .borrow_mut()
30        .switchboard
31        .get_custom_topic(data_type)
32}
33
34#[must_use]
35pub fn get_instruments_topic(venue: Venue) -> Ustr {
36    get_message_bus()
37        .borrow_mut()
38        .switchboard
39        .get_instruments_topic(venue)
40}
41
42#[must_use]
43pub fn get_instrument_topic(instrument_id: InstrumentId) -> Ustr {
44    get_message_bus()
45        .borrow_mut()
46        .switchboard
47        .get_instrument_topic(instrument_id)
48}
49
50#[must_use]
51pub fn get_book_deltas_topic(instrument_id: InstrumentId) -> Ustr {
52    get_message_bus()
53        .borrow_mut()
54        .switchboard
55        .get_book_deltas_topic(instrument_id)
56}
57
58#[must_use]
59pub fn get_book_depth10_topic(instrument_id: InstrumentId) -> Ustr {
60    get_message_bus()
61        .borrow_mut()
62        .switchboard
63        .get_book_depth10_topic(instrument_id)
64}
65
66#[must_use]
67pub fn get_book_snapshots_topic(instrument_id: InstrumentId) -> Ustr {
68    get_message_bus()
69        .borrow_mut()
70        .switchboard
71        .get_book_snapshots_topic(instrument_id)
72}
73
74#[must_use]
75pub fn get_quotes_topic(instrument_id: InstrumentId) -> Ustr {
76    get_message_bus()
77        .borrow_mut()
78        .switchboard
79        .get_quotes_topic(instrument_id)
80}
81
82#[must_use]
83pub fn get_trades_topic(instrument_id: InstrumentId) -> Ustr {
84    get_message_bus()
85        .borrow_mut()
86        .switchboard
87        .get_trades_topic(instrument_id)
88}
89
90#[must_use]
91pub fn get_bars_topic(bar_type: BarType) -> Ustr {
92    get_message_bus()
93        .borrow_mut()
94        .switchboard
95        .get_bars_topic(bar_type)
96}
97
98#[must_use]
99pub fn get_mark_price_topic(instrument_id: InstrumentId) -> Ustr {
100    get_message_bus()
101        .borrow_mut()
102        .switchboard
103        .get_mark_price_topic(instrument_id)
104}
105
106#[must_use]
107pub fn get_index_price_topic(instrument_id: InstrumentId) -> Ustr {
108    get_message_bus()
109        .borrow_mut()
110        .switchboard
111        .get_index_price_topic(instrument_id)
112}
113
114#[must_use]
115pub fn get_instrument_status_topic(instrument_id: InstrumentId) -> Ustr {
116    get_message_bus()
117        .borrow_mut()
118        .switchboard
119        .get_instrument_status_topic(instrument_id)
120}
121
122#[must_use]
123pub fn get_instrument_close_topic(instrument_id: InstrumentId) -> Ustr {
124    get_message_bus()
125        .borrow_mut()
126        .switchboard
127        .get_instrument_close_topic(instrument_id)
128}
129
130#[must_use]
131pub fn get_order_snapshots_topic(client_order_id: ClientOrderId) -> Ustr {
132    get_message_bus()
133        .borrow_mut()
134        .switchboard
135        .get_order_snapshots_topic(client_order_id)
136}
137
138#[must_use]
139pub fn get_positions_snapshots_topic(position_id: PositionId) -> Ustr {
140    get_message_bus()
141        .borrow_mut()
142        .switchboard
143        .get_positions_snapshots_topic(position_id)
144}
145
146#[must_use]
147pub fn get_event_orders_topic(strategy_id: StrategyId) -> Ustr {
148    get_message_bus()
149        .borrow_mut()
150        .switchboard
151        .get_event_orders_topic(strategy_id)
152}
153
154#[must_use]
155pub fn get_event_positions_topic(strategy_id: StrategyId) -> Ustr {
156    get_message_bus()
157        .borrow_mut()
158        .switchboard
159        .get_event_positions_topic(strategy_id)
160}
161
162/// Represents a switchboard of built-in messaging endpoint names.
163#[derive(Clone, Debug)]
164pub struct MessagingSwitchboard {
165    custom_topics: HashMap<DataType, Ustr>,
166    instruments_topics: HashMap<Venue, Ustr>,
167    instrument_topics: HashMap<InstrumentId, Ustr>,
168    book_deltas_topics: HashMap<InstrumentId, Ustr>,
169    book_depth10_topics: HashMap<InstrumentId, Ustr>,
170    book_snapshots_topics: HashMap<InstrumentId, Ustr>,
171    quote_topics: HashMap<InstrumentId, Ustr>,
172    trade_topics: HashMap<InstrumentId, Ustr>,
173    bar_topics: HashMap<BarType, Ustr>,
174    mark_price_topics: HashMap<InstrumentId, Ustr>,
175    index_price_topics: HashMap<InstrumentId, Ustr>,
176    instrument_status_topics: HashMap<InstrumentId, Ustr>,
177    instrument_close_topics: HashMap<InstrumentId, Ustr>,
178    event_orders_topics: HashMap<StrategyId, Ustr>,
179    event_positions_topics: HashMap<StrategyId, Ustr>,
180    order_snapshots_topics: HashMap<ClientOrderId, Ustr>,
181    positions_snapshots_topics: HashMap<PositionId, Ustr>,
182}
183
184impl Default for MessagingSwitchboard {
185    /// Creates a new default [`MessagingSwitchboard`] instance.
186    fn default() -> Self {
187        Self {
188            custom_topics: HashMap::new(),
189            instruments_topics: HashMap::new(),
190            instrument_topics: HashMap::new(),
191            book_deltas_topics: HashMap::new(),
192            book_snapshots_topics: HashMap::new(),
193            book_depth10_topics: HashMap::new(),
194            quote_topics: HashMap::new(),
195            trade_topics: HashMap::new(),
196            mark_price_topics: HashMap::new(),
197            index_price_topics: HashMap::new(),
198            bar_topics: HashMap::new(),
199            instrument_status_topics: HashMap::new(),
200            instrument_close_topics: HashMap::new(),
201            order_snapshots_topics: HashMap::new(),
202            event_orders_topics: HashMap::new(),
203            event_positions_topics: HashMap::new(),
204            positions_snapshots_topics: HashMap::new(),
205        }
206    }
207}
208
209impl MessagingSwitchboard {
210    #[must_use]
211    pub fn data_engine_execute() -> Ustr {
212        Ustr::from("DataEngine.execute")
213    }
214
215    #[must_use]
216    pub fn data_engine_process() -> Ustr {
217        Ustr::from("DataEngine.process")
218    }
219
220    #[must_use]
221    pub fn exec_engine_execute() -> Ustr {
222        Ustr::from("ExecEngine.execute")
223    }
224
225    #[must_use]
226    pub fn exec_engine_process() -> Ustr {
227        Ustr::from("ExecEngine.process")
228    }
229
230    #[must_use]
231    pub fn get_custom_topic(&mut self, data_type: &DataType) -> Ustr {
232        *self
233            .custom_topics
234            .entry(data_type.clone())
235            .or_insert_with(|| Ustr::from(&format!("data.{}", data_type.topic())))
236    }
237
238    #[must_use]
239    pub fn get_instruments_topic(&mut self, venue: Venue) -> Ustr {
240        *self
241            .instruments_topics
242            .entry(venue)
243            .or_insert_with(|| Ustr::from(&format!("data.instrument.{}", venue)))
244    }
245
246    #[must_use]
247    pub fn get_instrument_topic(&mut self, instrument_id: InstrumentId) -> Ustr {
248        *self
249            .instrument_topics
250            .entry(instrument_id)
251            .or_insert_with(|| {
252                Ustr::from(&format!(
253                    "data.instrument.{}.{}",
254                    instrument_id.venue, instrument_id.symbol
255                ))
256            })
257    }
258
259    #[must_use]
260    pub fn get_book_deltas_topic(&mut self, instrument_id: InstrumentId) -> Ustr {
261        *self
262            .book_deltas_topics
263            .entry(instrument_id)
264            .or_insert_with(|| {
265                Ustr::from(&format!(
266                    "data.book.deltas.{}.{}",
267                    instrument_id.venue, instrument_id.symbol
268                ))
269            })
270    }
271
272    #[must_use]
273    pub fn get_book_depth10_topic(&mut self, instrument_id: InstrumentId) -> Ustr {
274        *self
275            .book_depth10_topics
276            .entry(instrument_id)
277            .or_insert_with(|| {
278                Ustr::from(&format!(
279                    "data.book.depth10.{}.{}",
280                    instrument_id.venue, instrument_id.symbol
281                ))
282            })
283    }
284
285    #[must_use]
286    pub fn get_book_snapshots_topic(&mut self, instrument_id: InstrumentId) -> Ustr {
287        *self
288            .book_snapshots_topics
289            .entry(instrument_id)
290            .or_insert_with(|| {
291                Ustr::from(&format!(
292                    "data.book.snapshots.{}.{}",
293                    instrument_id.venue, instrument_id.symbol
294                ))
295            })
296    }
297
298    #[must_use]
299    pub fn get_quotes_topic(&mut self, instrument_id: InstrumentId) -> Ustr {
300        *self.quote_topics.entry(instrument_id).or_insert_with(|| {
301            Ustr::from(&format!(
302                "data.quotes.{}.{}",
303                instrument_id.venue, instrument_id.symbol
304            ))
305        })
306    }
307
308    #[must_use]
309    pub fn get_trades_topic(&mut self, instrument_id: InstrumentId) -> Ustr {
310        *self.trade_topics.entry(instrument_id).or_insert_with(|| {
311            Ustr::from(&format!(
312                "data.trades.{}.{}",
313                instrument_id.venue, instrument_id.symbol
314            ))
315        })
316    }
317
318    #[must_use]
319    pub fn get_bars_topic(&mut self, bar_type: BarType) -> Ustr {
320        *self
321            .bar_topics
322            .entry(bar_type)
323            .or_insert_with(|| Ustr::from(&format!("data.bars.{bar_type}")))
324    }
325
326    #[must_use]
327    pub fn get_mark_price_topic(&mut self, instrument_id: InstrumentId) -> Ustr {
328        *self
329            .mark_price_topics
330            .entry(instrument_id)
331            .or_insert_with(|| {
332                Ustr::from(&format!(
333                    "data.mark_prices.{}.{}",
334                    instrument_id.venue, instrument_id.symbol
335                ))
336            })
337    }
338
339    #[must_use]
340    pub fn get_index_price_topic(&mut self, instrument_id: InstrumentId) -> Ustr {
341        *self
342            .index_price_topics
343            .entry(instrument_id)
344            .or_insert_with(|| {
345                Ustr::from(&format!(
346                    "data.index_prices.{}.{}",
347                    instrument_id.venue, instrument_id.symbol
348                ))
349            })
350    }
351
352    #[must_use]
353    pub fn get_instrument_status_topic(&mut self, instrument_id: InstrumentId) -> Ustr {
354        *self
355            .instrument_status_topics
356            .entry(instrument_id)
357            .or_insert_with(|| {
358                Ustr::from(&format!(
359                    "data.status.{}.{}",
360                    instrument_id.venue, instrument_id.symbol
361                ))
362            })
363    }
364
365    #[must_use]
366    pub fn get_instrument_close_topic(&mut self, instrument_id: InstrumentId) -> Ustr {
367        *self
368            .instrument_close_topics
369            .entry(instrument_id)
370            .or_insert_with(|| {
371                Ustr::from(&format!(
372                    "data.close.{}.{}",
373                    instrument_id.venue, instrument_id.symbol
374                ))
375            })
376    }
377
378    #[must_use]
379    pub fn get_order_snapshots_topic(&mut self, client_order_id: ClientOrderId) -> Ustr {
380        *self
381            .order_snapshots_topics
382            .entry(client_order_id)
383            .or_insert_with(|| Ustr::from(&format!("order.snapshots.{client_order_id}")))
384    }
385
386    #[must_use]
387    pub fn get_positions_snapshots_topic(&mut self, position_id: PositionId) -> Ustr {
388        *self
389            .positions_snapshots_topics
390            .entry(position_id)
391            .or_insert_with(|| Ustr::from(&format!("positions.snapshots.{position_id}")))
392    }
393
394    #[must_use]
395    pub fn get_event_orders_topic(&mut self, strategy_id: StrategyId) -> Ustr {
396        *self
397            .event_orders_topics
398            .entry(strategy_id)
399            .or_insert_with(|| Ustr::from(&format!("events.order.{strategy_id}")))
400    }
401
402    #[must_use]
403    pub fn get_event_positions_topic(&mut self, strategy_id: StrategyId) -> Ustr {
404        *self
405            .event_positions_topics
406            .entry(strategy_id)
407            .or_insert_with(|| Ustr::from(&format!("events.position.{strategy_id}")))
408    }
409}
410
411////////////////////////////////////////////////////////////////////////////////
412// Tests
413////////////////////////////////////////////////////////////////////////////////
414#[cfg(test)]
415mod tests {
416    use nautilus_model::{
417        data::{BarType, DataType},
418        identifiers::InstrumentId,
419    };
420    use rstest::*;
421
422    use super::*;
423
424    #[fixture]
425    fn switchboard() -> MessagingSwitchboard {
426        MessagingSwitchboard::default()
427    }
428
429    #[fixture]
430    fn instrument_id() -> InstrumentId {
431        InstrumentId::from("ESZ24.XCME")
432    }
433
434    #[rstest]
435    fn test_get_custom_topic(mut switchboard: MessagingSwitchboard) {
436        let data_type = DataType::new("ExampleDataType", None);
437        let expected_topic = Ustr::from("data.ExampleDataType");
438        let result = switchboard.get_custom_topic(&data_type);
439        assert_eq!(result, expected_topic);
440        assert!(switchboard.custom_topics.contains_key(&data_type));
441    }
442
443    #[rstest]
444    fn test_get_instrument_topic(
445        mut switchboard: MessagingSwitchboard,
446        instrument_id: InstrumentId,
447    ) {
448        let expected_topic = Ustr::from("data.instrument.XCME.ESZ24");
449        let result = switchboard.get_instrument_topic(instrument_id);
450        assert_eq!(result, expected_topic);
451        assert!(switchboard.instrument_topics.contains_key(&instrument_id));
452    }
453
454    #[rstest]
455    fn test_get_book_deltas_topic(
456        mut switchboard: MessagingSwitchboard,
457        instrument_id: InstrumentId,
458    ) {
459        let expected_topic = Ustr::from("data.book.deltas.XCME.ESZ24");
460        let result = switchboard.get_book_deltas_topic(instrument_id);
461        assert_eq!(result, expected_topic);
462        assert!(switchboard.book_deltas_topics.contains_key(&instrument_id));
463    }
464
465    #[rstest]
466    fn test_get_book_depth10_topic(
467        mut switchboard: MessagingSwitchboard,
468        instrument_id: InstrumentId,
469    ) {
470        let expected_topic = Ustr::from("data.book.depth10.XCME.ESZ24");
471        let result = switchboard.get_book_depth10_topic(instrument_id);
472        assert_eq!(result, expected_topic);
473        assert!(switchboard.book_depth10_topics.contains_key(&instrument_id));
474    }
475
476    #[rstest]
477    fn test_get_book_snapshots_topic(
478        mut switchboard: MessagingSwitchboard,
479        instrument_id: InstrumentId,
480    ) {
481        let expected_topic = Ustr::from("data.book.snapshots.XCME.ESZ24");
482        let result = switchboard.get_book_snapshots_topic(instrument_id);
483        assert_eq!(result, expected_topic);
484        assert!(
485            switchboard
486                .book_snapshots_topics
487                .contains_key(&instrument_id)
488        );
489    }
490
491    #[rstest]
492    fn test_get_quotes_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
493        let expected_topic = Ustr::from("data.quotes.XCME.ESZ24");
494        let result = switchboard.get_quotes_topic(instrument_id);
495        assert_eq!(result, expected_topic);
496        assert!(switchboard.quote_topics.contains_key(&instrument_id));
497    }
498
499    #[rstest]
500    fn test_get_trades_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
501        let expected_topic = Ustr::from("data.trades.XCME.ESZ24");
502        let result = switchboard.get_trades_topic(instrument_id);
503        assert_eq!(result, expected_topic);
504        assert!(switchboard.trade_topics.contains_key(&instrument_id));
505    }
506
507    #[rstest]
508    fn test_get_bars_topic(mut switchboard: MessagingSwitchboard) {
509        let bar_type = BarType::from("ESZ24.XCME-1-MINUTE-LAST-INTERNAL");
510        let expected_topic = Ustr::from(&format!("data.bars.{bar_type}"));
511        let result = switchboard.get_bars_topic(bar_type);
512        assert_eq!(result, expected_topic);
513        assert!(switchboard.bar_topics.contains_key(&bar_type));
514    }
515
516    #[rstest]
517    fn test_get_order_snapshots_topic(mut switchboard: MessagingSwitchboard) {
518        let client_order_id = ClientOrderId::from("O-123456789");
519        let expected_topic = Ustr::from(&format!("order.snapshots.{client_order_id}"));
520        let result = switchboard.get_order_snapshots_topic(client_order_id);
521        assert_eq!(result, expected_topic);
522        assert!(
523            switchboard
524                .order_snapshots_topics
525                .contains_key(&client_order_id)
526        );
527    }
528}