nautilus_common/msgbus/
switchboard.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::collections::HashMap;
17
18use nautilus_model::{
19    data::{BarType, DataType},
20    identifiers::{ClientOrderId, InstrumentId, PositionId, StrategyId, Venue},
21};
22
23use super::core::{Endpoint, MStr, Topic};
24use crate::msgbus::get_message_bus;
25
26pub const CLOSE_TOPIC: &str = "CLOSE";
27
28#[must_use]
29pub fn get_custom_topic(data_type: &DataType) -> MStr<Topic> {
30    get_message_bus()
31        .borrow_mut()
32        .switchboard
33        .get_custom_topic(data_type)
34}
35
36#[must_use]
37pub fn get_instruments_topic(venue: Venue) -> MStr<Topic> {
38    get_message_bus()
39        .borrow_mut()
40        .switchboard
41        .get_instruments_topic(venue)
42}
43
44#[must_use]
45pub fn get_instrument_topic(instrument_id: InstrumentId) -> MStr<Topic> {
46    get_message_bus()
47        .borrow_mut()
48        .switchboard
49        .get_instrument_topic(instrument_id)
50}
51
52#[must_use]
53pub fn get_book_deltas_topic(instrument_id: InstrumentId) -> MStr<Topic> {
54    get_message_bus()
55        .borrow_mut()
56        .switchboard
57        .get_book_deltas_topic(instrument_id)
58}
59
60#[must_use]
61pub fn get_book_depth10_topic(instrument_id: InstrumentId) -> MStr<Topic> {
62    get_message_bus()
63        .borrow_mut()
64        .switchboard
65        .get_book_depth10_topic(instrument_id)
66}
67
68#[must_use]
69pub fn get_book_snapshots_topic(instrument_id: InstrumentId) -> MStr<Topic> {
70    get_message_bus()
71        .borrow_mut()
72        .switchboard
73        .get_book_snapshots_topic(instrument_id)
74}
75
76#[must_use]
77pub fn get_quotes_topic(instrument_id: InstrumentId) -> MStr<Topic> {
78    get_message_bus()
79        .borrow_mut()
80        .switchboard
81        .get_quotes_topic(instrument_id)
82}
83
84#[must_use]
85pub fn get_trades_topic(instrument_id: InstrumentId) -> MStr<Topic> {
86    get_message_bus()
87        .borrow_mut()
88        .switchboard
89        .get_trades_topic(instrument_id)
90}
91
92#[must_use]
93pub fn get_bars_topic(bar_type: BarType) -> MStr<Topic> {
94    get_message_bus()
95        .borrow_mut()
96        .switchboard
97        .get_bars_topic(bar_type)
98}
99
100#[must_use]
101pub fn get_mark_price_topic(instrument_id: InstrumentId) -> MStr<Topic> {
102    get_message_bus()
103        .borrow_mut()
104        .switchboard
105        .get_mark_price_topic(instrument_id)
106}
107
108#[must_use]
109pub fn get_index_price_topic(instrument_id: InstrumentId) -> MStr<Topic> {
110    get_message_bus()
111        .borrow_mut()
112        .switchboard
113        .get_index_price_topic(instrument_id)
114}
115
116#[must_use]
117pub fn get_instrument_status_topic(instrument_id: InstrumentId) -> MStr<Topic> {
118    get_message_bus()
119        .borrow_mut()
120        .switchboard
121        .get_instrument_status_topic(instrument_id)
122}
123
124#[must_use]
125pub fn get_instrument_close_topic(instrument_id: InstrumentId) -> MStr<Topic> {
126    get_message_bus()
127        .borrow_mut()
128        .switchboard
129        .get_instrument_close_topic(instrument_id)
130}
131
132#[must_use]
133pub fn get_order_snapshots_topic(client_order_id: ClientOrderId) -> MStr<Topic> {
134    get_message_bus()
135        .borrow_mut()
136        .switchboard
137        .get_order_snapshots_topic(client_order_id)
138}
139
140#[must_use]
141pub fn get_positions_snapshots_topic(position_id: PositionId) -> MStr<Topic> {
142    get_message_bus()
143        .borrow_mut()
144        .switchboard
145        .get_positions_snapshots_topic(position_id)
146}
147
148#[must_use]
149pub fn get_event_orders_topic(strategy_id: StrategyId) -> MStr<Topic> {
150    get_message_bus()
151        .borrow_mut()
152        .switchboard
153        .get_event_orders_topic(strategy_id)
154}
155
156#[must_use]
157pub fn get_event_positions_topic(strategy_id: StrategyId) -> MStr<Topic> {
158    get_message_bus()
159        .borrow_mut()
160        .switchboard
161        .get_event_positions_topic(strategy_id)
162}
163
164/// Represents a switchboard of built-in messaging endpoint names.
165#[derive(Clone, Debug)]
166pub struct MessagingSwitchboard {
167    custom_topics: HashMap<DataType, MStr<Topic>>,
168    instruments_topics: HashMap<Venue, MStr<Topic>>,
169    instrument_topics: HashMap<InstrumentId, MStr<Topic>>,
170    book_deltas_topics: HashMap<InstrumentId, MStr<Topic>>,
171    book_depth10_topics: HashMap<InstrumentId, MStr<Topic>>,
172    book_snapshots_topics: HashMap<InstrumentId, MStr<Topic>>,
173    quote_topics: HashMap<InstrumentId, MStr<Topic>>,
174    trade_topics: HashMap<InstrumentId, MStr<Topic>>,
175    bar_topics: HashMap<BarType, MStr<Topic>>,
176    mark_price_topics: HashMap<InstrumentId, MStr<Topic>>,
177    index_price_topics: HashMap<InstrumentId, MStr<Topic>>,
178    instrument_status_topics: HashMap<InstrumentId, MStr<Topic>>,
179    instrument_close_topics: HashMap<InstrumentId, MStr<Topic>>,
180    event_orders_topics: HashMap<StrategyId, MStr<Topic>>,
181    event_positions_topics: HashMap<StrategyId, MStr<Topic>>,
182    order_snapshots_topics: HashMap<ClientOrderId, MStr<Topic>>,
183    positions_snapshots_topics: HashMap<PositionId, MStr<Topic>>,
184}
185
186impl Default for MessagingSwitchboard {
187    /// Creates a new default [`MessagingSwitchboard`] instance.
188    fn default() -> Self {
189        Self {
190            custom_topics: HashMap::new(),
191            instruments_topics: HashMap::new(),
192            instrument_topics: HashMap::new(),
193            book_deltas_topics: HashMap::new(),
194            book_snapshots_topics: HashMap::new(),
195            book_depth10_topics: HashMap::new(),
196            quote_topics: HashMap::new(),
197            trade_topics: HashMap::new(),
198            mark_price_topics: HashMap::new(),
199            index_price_topics: HashMap::new(),
200            bar_topics: HashMap::new(),
201            instrument_status_topics: HashMap::new(),
202            instrument_close_topics: HashMap::new(),
203            order_snapshots_topics: HashMap::new(),
204            event_orders_topics: HashMap::new(),
205            event_positions_topics: HashMap::new(),
206            positions_snapshots_topics: HashMap::new(),
207        }
208    }
209}
210
211impl MessagingSwitchboard {
212    #[must_use]
213    pub fn data_engine_execute() -> MStr<Endpoint> {
214        "DataEngine.execute".into()
215    }
216
217    #[must_use]
218    pub fn data_engine_process() -> MStr<Endpoint> {
219        "DataEngine.process".into()
220    }
221
222    #[must_use]
223    pub fn exec_engine_execute() -> MStr<Endpoint> {
224        "ExecEngine.execute".into()
225    }
226
227    #[must_use]
228    pub fn exec_engine_process() -> MStr<Endpoint> {
229        "ExecEngine.process".into()
230    }
231
232    #[must_use]
233    pub fn get_custom_topic(&mut self, data_type: &DataType) -> MStr<Topic> {
234        *self
235            .custom_topics
236            .entry(data_type.clone())
237            .or_insert_with(|| format!("data.{}", data_type.topic()).into())
238    }
239
240    #[must_use]
241    pub fn get_instruments_topic(&mut self, venue: Venue) -> MStr<Topic> {
242        *self
243            .instruments_topics
244            .entry(venue)
245            .or_insert_with(|| format!("data.instrument.{}", venue).into())
246    }
247
248    #[must_use]
249    pub fn get_instrument_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
250        *self
251            .instrument_topics
252            .entry(instrument_id)
253            .or_insert_with(|| {
254                format!(
255                    "data.instrument.{}.{}",
256                    instrument_id.venue, instrument_id.symbol
257                )
258                .into()
259            })
260    }
261
262    #[must_use]
263    pub fn get_book_deltas_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
264        *self
265            .book_deltas_topics
266            .entry(instrument_id)
267            .or_insert_with(|| {
268                format!(
269                    "data.book.deltas.{}.{}",
270                    instrument_id.venue, instrument_id.symbol
271                )
272                .into()
273            })
274    }
275
276    #[must_use]
277    pub fn get_book_depth10_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
278        *self
279            .book_depth10_topics
280            .entry(instrument_id)
281            .or_insert_with(|| {
282                format!(
283                    "data.book.depth10.{}.{}",
284                    instrument_id.venue, instrument_id.symbol
285                )
286                .into()
287            })
288    }
289
290    #[must_use]
291    pub fn get_book_snapshots_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
292        *self
293            .book_snapshots_topics
294            .entry(instrument_id)
295            .or_insert_with(|| {
296                format!(
297                    "data.book.snapshots.{}.{}",
298                    instrument_id.venue, instrument_id.symbol
299                )
300                .into()
301            })
302    }
303
304    #[must_use]
305    pub fn get_quotes_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
306        *self.quote_topics.entry(instrument_id).or_insert_with(|| {
307            format!(
308                "data.quotes.{}.{}",
309                instrument_id.venue, instrument_id.symbol
310            )
311            .into()
312        })
313    }
314
315    #[must_use]
316    pub fn get_trades_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
317        *self.trade_topics.entry(instrument_id).or_insert_with(|| {
318            format!(
319                "data.trades.{}.{}",
320                instrument_id.venue, instrument_id.symbol
321            )
322            .into()
323        })
324    }
325
326    #[must_use]
327    pub fn get_bars_topic(&mut self, bar_type: BarType) -> MStr<Topic> {
328        *self
329            .bar_topics
330            .entry(bar_type)
331            .or_insert_with(|| format!("data.bars.{bar_type}").into())
332    }
333
334    #[must_use]
335    pub fn get_mark_price_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
336        *self
337            .mark_price_topics
338            .entry(instrument_id)
339            .or_insert_with(|| {
340                format!(
341                    "data.mark_prices.{}.{}",
342                    instrument_id.venue, instrument_id.symbol
343                )
344                .into()
345            })
346    }
347
348    #[must_use]
349    pub fn get_index_price_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
350        *self
351            .index_price_topics
352            .entry(instrument_id)
353            .or_insert_with(|| {
354                format!(
355                    "data.index_prices.{}.{}",
356                    instrument_id.venue, instrument_id.symbol
357                )
358                .into()
359            })
360    }
361
362    #[must_use]
363    pub fn get_instrument_status_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
364        *self
365            .instrument_status_topics
366            .entry(instrument_id)
367            .or_insert_with(|| {
368                format!(
369                    "data.status.{}.{}",
370                    instrument_id.venue, instrument_id.symbol
371                )
372                .into()
373            })
374    }
375
376    #[must_use]
377    pub fn get_instrument_close_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
378        *self
379            .instrument_close_topics
380            .entry(instrument_id)
381            .or_insert_with(|| {
382                format!(
383                    "data.close.{}.{}",
384                    instrument_id.venue, instrument_id.symbol
385                )
386                .into()
387            })
388    }
389
390    #[must_use]
391    pub fn get_order_snapshots_topic(&mut self, client_order_id: ClientOrderId) -> MStr<Topic> {
392        *self
393            .order_snapshots_topics
394            .entry(client_order_id)
395            .or_insert_with(|| format!("order.snapshots.{client_order_id}").into())
396    }
397
398    #[must_use]
399    pub fn get_positions_snapshots_topic(&mut self, position_id: PositionId) -> MStr<Topic> {
400        *self
401            .positions_snapshots_topics
402            .entry(position_id)
403            .or_insert_with(|| format!("positions.snapshots.{position_id}").into())
404    }
405
406    #[must_use]
407    pub fn get_event_orders_topic(&mut self, strategy_id: StrategyId) -> MStr<Topic> {
408        *self
409            .event_orders_topics
410            .entry(strategy_id)
411            .or_insert_with(|| format!("events.order.{strategy_id}").into())
412    }
413
414    #[must_use]
415    pub fn get_event_positions_topic(&mut self, strategy_id: StrategyId) -> MStr<Topic> {
416        *self
417            .event_positions_topics
418            .entry(strategy_id)
419            .or_insert_with(|| format!("events.position.{strategy_id}").into())
420    }
421}
422
423////////////////////////////////////////////////////////////////////////////////
424// Tests
425////////////////////////////////////////////////////////////////////////////////
426#[cfg(test)]
427mod tests {
428    use nautilus_model::{
429        data::{BarType, DataType},
430        identifiers::InstrumentId,
431    };
432    use rstest::*;
433
434    use super::*;
435
436    #[fixture]
437    fn switchboard() -> MessagingSwitchboard {
438        MessagingSwitchboard::default()
439    }
440
441    #[fixture]
442    fn instrument_id() -> InstrumentId {
443        InstrumentId::from("ESZ24.XCME")
444    }
445
446    #[rstest]
447    fn test_get_custom_topic(mut switchboard: MessagingSwitchboard) {
448        let data_type = DataType::new("ExampleDataType", None);
449        let expected_topic = "data.ExampleDataType".into();
450        let result = switchboard.get_custom_topic(&data_type);
451        assert_eq!(result, expected_topic);
452        assert!(switchboard.custom_topics.contains_key(&data_type));
453    }
454
455    #[rstest]
456    fn test_get_instrument_topic(
457        mut switchboard: MessagingSwitchboard,
458        instrument_id: InstrumentId,
459    ) {
460        let expected_topic = "data.instrument.XCME.ESZ24".into();
461        let result = switchboard.get_instrument_topic(instrument_id);
462        assert_eq!(result, expected_topic);
463        assert!(switchboard.instrument_topics.contains_key(&instrument_id));
464    }
465
466    #[rstest]
467    fn test_get_book_deltas_topic(
468        mut switchboard: MessagingSwitchboard,
469        instrument_id: InstrumentId,
470    ) {
471        let expected_topic = "data.book.deltas.XCME.ESZ24".into();
472        let result = switchboard.get_book_deltas_topic(instrument_id);
473        assert_eq!(result, expected_topic);
474        assert!(switchboard.book_deltas_topics.contains_key(&instrument_id));
475    }
476
477    #[rstest]
478    fn test_get_book_depth10_topic(
479        mut switchboard: MessagingSwitchboard,
480        instrument_id: InstrumentId,
481    ) {
482        let expected_topic = "data.book.depth10.XCME.ESZ24".into();
483        let result = switchboard.get_book_depth10_topic(instrument_id);
484        assert_eq!(result, expected_topic);
485        assert!(switchboard.book_depth10_topics.contains_key(&instrument_id));
486    }
487
488    #[rstest]
489    fn test_get_book_snapshots_topic(
490        mut switchboard: MessagingSwitchboard,
491        instrument_id: InstrumentId,
492    ) {
493        let expected_topic = "data.book.snapshots.XCME.ESZ24".into();
494        let result = switchboard.get_book_snapshots_topic(instrument_id);
495        assert_eq!(result, expected_topic);
496        assert!(
497            switchboard
498                .book_snapshots_topics
499                .contains_key(&instrument_id)
500        );
501    }
502
503    #[rstest]
504    fn test_get_quotes_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
505        let expected_topic = "data.quotes.XCME.ESZ24".into();
506        let result = switchboard.get_quotes_topic(instrument_id);
507        assert_eq!(result, expected_topic);
508        assert!(switchboard.quote_topics.contains_key(&instrument_id));
509    }
510
511    #[rstest]
512    fn test_get_trades_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
513        let expected_topic = "data.trades.XCME.ESZ24".into();
514        let result = switchboard.get_trades_topic(instrument_id);
515        assert_eq!(result, expected_topic);
516        assert!(switchboard.trade_topics.contains_key(&instrument_id));
517    }
518
519    #[rstest]
520    fn test_get_bars_topic(mut switchboard: MessagingSwitchboard) {
521        let bar_type = BarType::from("ESZ24.XCME-1-MINUTE-LAST-INTERNAL");
522        let expected_topic = format!("data.bars.{bar_type}").into();
523        let result = switchboard.get_bars_topic(bar_type);
524        assert_eq!(result, expected_topic);
525        assert!(switchboard.bar_topics.contains_key(&bar_type));
526    }
527
528    #[rstest]
529    fn test_get_order_snapshots_topic(mut switchboard: MessagingSwitchboard) {
530        let client_order_id = ClientOrderId::from("O-123456789");
531        let expected_topic = format!("order.snapshots.{client_order_id}").into();
532        let result = switchboard.get_order_snapshots_topic(client_order_id);
533        assert_eq!(result, expected_topic);
534        assert!(
535            switchboard
536                .order_snapshots_topics
537                .contains_key(&client_order_id)
538        );
539    }
540}