nautilus_common/msgbus/
switchboard.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::collections::HashMap;
17
18use nautilus_model::{
19    data::{BarType, DataType},
20    identifiers::{ClientOrderId, InstrumentId, PositionId, StrategyId},
21};
22use ustr::Ustr;
23
24/// Represents a switchboard of built-in messaging endpoint names.
25#[derive(Clone, Debug)]
26pub struct MessagingSwitchboard {
27    pub data_engine_execute: Ustr,
28    pub data_engine_process: Ustr,
29    pub exec_engine_execute: Ustr,
30    pub exec_engine_process: Ustr,
31    custom_topics: HashMap<DataType, Ustr>,
32    instrument_topics: HashMap<InstrumentId, Ustr>,
33    deltas_topics: HashMap<InstrumentId, Ustr>,
34    book_snapshots_topics: HashMap<InstrumentId, Ustr>,
35    event_orders_topics: HashMap<StrategyId, Ustr>,
36    event_positions_topics: HashMap<StrategyId, Ustr>,
37    depth_topics: HashMap<InstrumentId, Ustr>,
38    quote_topics: HashMap<InstrumentId, Ustr>,
39    trade_topics: HashMap<InstrumentId, Ustr>,
40    bar_topics: HashMap<BarType, Ustr>,
41    order_snapshots_topics: HashMap<ClientOrderId, Ustr>,
42    positions_snapshots_topics: HashMap<PositionId, Ustr>,
43}
44
45impl Default for MessagingSwitchboard {
46    /// Creates a new default [`MessagingSwitchboard`] instance.
47    fn default() -> Self {
48        Self {
49            data_engine_execute: Ustr::from("DataEngine.execute"),
50            data_engine_process: Ustr::from("DataEngine.process"),
51            exec_engine_execute: Ustr::from("ExecEngine.execute"),
52            exec_engine_process: Ustr::from("ExecEngine.process"),
53            custom_topics: HashMap::new(),
54            instrument_topics: HashMap::new(),
55            deltas_topics: HashMap::new(),
56            book_snapshots_topics: HashMap::new(),
57            depth_topics: HashMap::new(),
58            quote_topics: HashMap::new(),
59            trade_topics: HashMap::new(),
60            bar_topics: HashMap::new(),
61            order_snapshots_topics: HashMap::new(),
62            event_orders_topics: HashMap::new(),
63            event_positions_topics: HashMap::new(),
64            positions_snapshots_topics: HashMap::new(),
65        }
66    }
67}
68
69impl MessagingSwitchboard {
70    #[must_use]
71    pub fn get_custom_topic(&mut self, data_type: &DataType) -> Ustr {
72        *self
73            .custom_topics
74            .entry(data_type.clone())
75            .or_insert_with(|| Ustr::from(&format!("data.{}", data_type.topic())))
76    }
77
78    #[must_use]
79    pub fn get_instrument_topic(&mut self, instrument_id: InstrumentId) -> Ustr {
80        *self
81            .instrument_topics
82            .entry(instrument_id)
83            .or_insert_with(|| {
84                Ustr::from(&format!(
85                    "data.instrument.{}.{}",
86                    instrument_id.venue, instrument_id.symbol
87                ))
88            })
89    }
90
91    #[must_use]
92    pub fn get_deltas_topic(&mut self, instrument_id: InstrumentId) -> Ustr {
93        *self.deltas_topics.entry(instrument_id).or_insert_with(|| {
94            Ustr::from(&format!(
95                "data.book.deltas.{}.{}",
96                instrument_id.venue, instrument_id.symbol
97            ))
98        })
99    }
100
101    #[must_use]
102    pub fn get_depth_topic(&mut self, instrument_id: InstrumentId) -> Ustr {
103        *self.depth_topics.entry(instrument_id).or_insert_with(|| {
104            Ustr::from(&format!(
105                "data.book.depth.{}.{}",
106                instrument_id.venue, instrument_id.symbol
107            ))
108        })
109    }
110
111    #[must_use]
112    pub fn get_book_snapshots_topic(&mut self, instrument_id: InstrumentId) -> Ustr {
113        *self
114            .book_snapshots_topics
115            .entry(instrument_id)
116            .or_insert_with(|| {
117                Ustr::from(&format!(
118                    "data.book.snapshots.{}.{}",
119                    instrument_id.venue, instrument_id.symbol
120                ))
121            })
122    }
123
124    #[must_use]
125    pub fn get_quotes_topic(&mut self, instrument_id: InstrumentId) -> Ustr {
126        *self.quote_topics.entry(instrument_id).or_insert_with(|| {
127            Ustr::from(&format!(
128                "data.quotes.{}.{}",
129                instrument_id.venue, instrument_id.symbol
130            ))
131        })
132    }
133
134    #[must_use]
135    pub fn get_trades_topic(&mut self, instrument_id: InstrumentId) -> Ustr {
136        *self.trade_topics.entry(instrument_id).or_insert_with(|| {
137            Ustr::from(&format!(
138                "data.trades.{}.{}",
139                instrument_id.venue, instrument_id.symbol
140            ))
141        })
142    }
143
144    #[must_use]
145    pub fn get_bars_topic(&mut self, bar_type: BarType) -> Ustr {
146        *self
147            .bar_topics
148            .entry(bar_type)
149            .or_insert_with(|| Ustr::from(&format!("data.bars.{bar_type}")))
150    }
151
152    #[must_use]
153    pub fn get_order_snapshots_topic(&mut self, client_order_id: ClientOrderId) -> Ustr {
154        *self
155            .order_snapshots_topics
156            .entry(client_order_id)
157            .or_insert_with(|| Ustr::from(&format!("order.snapshots.{client_order_id}")))
158    }
159
160    #[must_use]
161    pub fn get_positions_snapshots_topic(&mut self, position_id: PositionId) -> Ustr {
162        *self
163            .positions_snapshots_topics
164            .entry(position_id)
165            .or_insert_with(|| Ustr::from(&format!("positions.snapshots.{position_id}")))
166    }
167
168    #[must_use]
169    pub fn get_event_orders_topic(&mut self, strategy_id: StrategyId) -> Ustr {
170        *self
171            .event_orders_topics
172            .entry(strategy_id)
173            .or_insert_with(|| Ustr::from(&format!("events.order.{strategy_id}")))
174    }
175
176    #[must_use]
177    pub fn get_event_positions_topic(&mut self, strategy_id: StrategyId) -> Ustr {
178        *self
179            .event_positions_topics
180            .entry(strategy_id)
181            .or_insert_with(|| Ustr::from(&format!("events.position.{strategy_id}")))
182    }
183}
184
185////////////////////////////////////////////////////////////////////////////////
186// Tests
187////////////////////////////////////////////////////////////////////////////////
188#[cfg(test)]
189mod tests {
190    use nautilus_model::{
191        data::{BarType, DataType},
192        identifiers::InstrumentId,
193    };
194    use rstest::*;
195
196    use super::*;
197
198    #[fixture]
199    fn switchboard() -> MessagingSwitchboard {
200        MessagingSwitchboard::default()
201    }
202
203    #[fixture]
204    fn instrument_id() -> InstrumentId {
205        InstrumentId::from("ESZ24.XCME")
206    }
207
208    #[rstest]
209    fn test_get_custom_topic(mut switchboard: MessagingSwitchboard) {
210        let data_type = DataType::new("ExampleDataType", None);
211        let expected_topic = Ustr::from("data.ExampleDataType");
212        let result = switchboard.get_custom_topic(&data_type);
213        assert_eq!(result, expected_topic);
214        assert!(switchboard.custom_topics.contains_key(&data_type));
215    }
216
217    #[rstest]
218    fn test_get_instrument_topic(
219        mut switchboard: MessagingSwitchboard,
220        instrument_id: InstrumentId,
221    ) {
222        let expected_topic = Ustr::from("data.instrument.XCME.ESZ24");
223        let result = switchboard.get_instrument_topic(instrument_id);
224        assert_eq!(result, expected_topic);
225        assert!(switchboard.instrument_topics.contains_key(&instrument_id));
226    }
227
228    #[rstest]
229    fn test_get_deltas_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
230        let expected_topic = Ustr::from("data.book.deltas.XCME.ESZ24");
231        let result = switchboard.get_deltas_topic(instrument_id);
232        assert_eq!(result, expected_topic);
233        assert!(switchboard.deltas_topics.contains_key(&instrument_id));
234    }
235
236    #[rstest]
237    fn test_get_book_snapshots_topic(
238        mut switchboard: MessagingSwitchboard,
239        instrument_id: InstrumentId,
240    ) {
241        let expected_topic = Ustr::from("data.book.snapshots.XCME.ESZ24");
242        let result = switchboard.get_book_snapshots_topic(instrument_id);
243        assert_eq!(result, expected_topic);
244        assert!(switchboard
245            .book_snapshots_topics
246            .contains_key(&instrument_id));
247    }
248
249    #[rstest]
250    fn test_get_depth_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
251        let expected_topic = Ustr::from("data.book.depth.XCME.ESZ24");
252        let result = switchboard.get_depth_topic(instrument_id);
253        assert_eq!(result, expected_topic);
254        assert!(switchboard.depth_topics.contains_key(&instrument_id));
255    }
256
257    #[rstest]
258    fn test_get_quotes_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
259        let expected_topic = Ustr::from("data.quotes.XCME.ESZ24");
260        let result = switchboard.get_quotes_topic(instrument_id);
261        assert_eq!(result, expected_topic);
262        assert!(switchboard.quote_topics.contains_key(&instrument_id));
263    }
264
265    #[rstest]
266    fn test_get_trades_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
267        let expected_topic = Ustr::from("data.trades.XCME.ESZ24");
268        let result = switchboard.get_trades_topic(instrument_id);
269        assert_eq!(result, expected_topic);
270        assert!(switchboard.trade_topics.contains_key(&instrument_id));
271    }
272
273    #[rstest]
274    fn test_get_bars_topic(mut switchboard: MessagingSwitchboard) {
275        let bar_type = BarType::from("ESZ24.XCME-1-MINUTE-LAST-INTERNAL");
276        let expected_topic = Ustr::from(&format!("data.bars.{bar_type}"));
277        let result = switchboard.get_bars_topic(bar_type);
278        assert_eq!(result, expected_topic);
279        assert!(switchboard.bar_topics.contains_key(&bar_type));
280    }
281
282    #[rstest]
283    fn test_get_order_snapshots_topic(mut switchboard: MessagingSwitchboard) {
284        let client_order_id = ClientOrderId::from("O-123456789");
285        let expected_topic = Ustr::from(&format!("order.snapshots.{client_order_id}"));
286        let result = switchboard.get_order_snapshots_topic(client_order_id);
287        assert_eq!(result, expected_topic);
288        assert!(switchboard
289            .order_snapshots_topics
290            .contains_key(&client_order_id));
291    }
292}