1use std::collections::HashMap;
17
18use nautilus_model::{
19 data::{BarType, DataType},
20 identifiers::{ClientOrderId, InstrumentId, PositionId, StrategyId},
21};
22use ustr::Ustr;
23
24#[derive(Clone, Debug)]
26pub struct MessagingSwitchboard {
27 pub data_engine_execute: Ustr,
28 pub data_engine_process: Ustr,
29 pub exec_engine_execute: Ustr,
30 pub exec_engine_process: Ustr,
31 custom_topics: HashMap<DataType, Ustr>,
32 instrument_topics: HashMap<InstrumentId, Ustr>,
33 deltas_topics: HashMap<InstrumentId, Ustr>,
34 book_snapshots_topics: HashMap<InstrumentId, Ustr>,
35 event_orders_topics: HashMap<StrategyId, Ustr>,
36 event_positions_topics: HashMap<StrategyId, Ustr>,
37 depth_topics: HashMap<InstrumentId, Ustr>,
38 quote_topics: HashMap<InstrumentId, Ustr>,
39 trade_topics: HashMap<InstrumentId, Ustr>,
40 bar_topics: HashMap<BarType, Ustr>,
41 order_snapshots_topics: HashMap<ClientOrderId, Ustr>,
42 positions_snapshots_topics: HashMap<PositionId, Ustr>,
43}
44
45impl Default for MessagingSwitchboard {
46 fn default() -> Self {
48 Self {
49 data_engine_execute: Ustr::from("DataEngine.execute"),
50 data_engine_process: Ustr::from("DataEngine.process"),
51 exec_engine_execute: Ustr::from("ExecEngine.execute"),
52 exec_engine_process: Ustr::from("ExecEngine.process"),
53 custom_topics: HashMap::new(),
54 instrument_topics: HashMap::new(),
55 deltas_topics: HashMap::new(),
56 book_snapshots_topics: HashMap::new(),
57 depth_topics: HashMap::new(),
58 quote_topics: HashMap::new(),
59 trade_topics: HashMap::new(),
60 bar_topics: HashMap::new(),
61 order_snapshots_topics: HashMap::new(),
62 event_orders_topics: HashMap::new(),
63 event_positions_topics: HashMap::new(),
64 positions_snapshots_topics: HashMap::new(),
65 }
66 }
67}
68
69impl MessagingSwitchboard {
70 #[must_use]
71 pub fn get_custom_topic(&mut self, data_type: &DataType) -> Ustr {
72 *self
73 .custom_topics
74 .entry(data_type.clone())
75 .or_insert_with(|| Ustr::from(&format!("data.{}", data_type.topic())))
76 }
77
78 #[must_use]
79 pub fn get_instrument_topic(&mut self, instrument_id: InstrumentId) -> Ustr {
80 *self
81 .instrument_topics
82 .entry(instrument_id)
83 .or_insert_with(|| {
84 Ustr::from(&format!(
85 "data.instrument.{}.{}",
86 instrument_id.venue, instrument_id.symbol
87 ))
88 })
89 }
90
91 #[must_use]
92 pub fn get_deltas_topic(&mut self, instrument_id: InstrumentId) -> Ustr {
93 *self.deltas_topics.entry(instrument_id).or_insert_with(|| {
94 Ustr::from(&format!(
95 "data.book.deltas.{}.{}",
96 instrument_id.venue, instrument_id.symbol
97 ))
98 })
99 }
100
101 #[must_use]
102 pub fn get_depth_topic(&mut self, instrument_id: InstrumentId) -> Ustr {
103 *self.depth_topics.entry(instrument_id).or_insert_with(|| {
104 Ustr::from(&format!(
105 "data.book.depth.{}.{}",
106 instrument_id.venue, instrument_id.symbol
107 ))
108 })
109 }
110
111 #[must_use]
112 pub fn get_book_snapshots_topic(&mut self, instrument_id: InstrumentId) -> Ustr {
113 *self
114 .book_snapshots_topics
115 .entry(instrument_id)
116 .or_insert_with(|| {
117 Ustr::from(&format!(
118 "data.book.snapshots.{}.{}",
119 instrument_id.venue, instrument_id.symbol
120 ))
121 })
122 }
123
124 #[must_use]
125 pub fn get_quotes_topic(&mut self, instrument_id: InstrumentId) -> Ustr {
126 *self.quote_topics.entry(instrument_id).or_insert_with(|| {
127 Ustr::from(&format!(
128 "data.quotes.{}.{}",
129 instrument_id.venue, instrument_id.symbol
130 ))
131 })
132 }
133
134 #[must_use]
135 pub fn get_trades_topic(&mut self, instrument_id: InstrumentId) -> Ustr {
136 *self.trade_topics.entry(instrument_id).or_insert_with(|| {
137 Ustr::from(&format!(
138 "data.trades.{}.{}",
139 instrument_id.venue, instrument_id.symbol
140 ))
141 })
142 }
143
144 #[must_use]
145 pub fn get_bars_topic(&mut self, bar_type: BarType) -> Ustr {
146 *self
147 .bar_topics
148 .entry(bar_type)
149 .or_insert_with(|| Ustr::from(&format!("data.bars.{bar_type}")))
150 }
151
152 #[must_use]
153 pub fn get_order_snapshots_topic(&mut self, client_order_id: ClientOrderId) -> Ustr {
154 *self
155 .order_snapshots_topics
156 .entry(client_order_id)
157 .or_insert_with(|| Ustr::from(&format!("order.snapshots.{client_order_id}")))
158 }
159
160 #[must_use]
161 pub fn get_positions_snapshots_topic(&mut self, position_id: PositionId) -> Ustr {
162 *self
163 .positions_snapshots_topics
164 .entry(position_id)
165 .or_insert_with(|| Ustr::from(&format!("positions.snapshots.{position_id}")))
166 }
167
168 #[must_use]
169 pub fn get_event_orders_topic(&mut self, strategy_id: StrategyId) -> Ustr {
170 *self
171 .event_orders_topics
172 .entry(strategy_id)
173 .or_insert_with(|| Ustr::from(&format!("events.order.{strategy_id}")))
174 }
175
176 #[must_use]
177 pub fn get_event_positions_topic(&mut self, strategy_id: StrategyId) -> Ustr {
178 *self
179 .event_positions_topics
180 .entry(strategy_id)
181 .or_insert_with(|| Ustr::from(&format!("events.position.{strategy_id}")))
182 }
183}
184
185#[cfg(test)]
189mod tests {
190 use nautilus_model::{
191 data::{BarType, DataType},
192 identifiers::InstrumentId,
193 };
194 use rstest::*;
195
196 use super::*;
197
198 #[fixture]
199 fn switchboard() -> MessagingSwitchboard {
200 MessagingSwitchboard::default()
201 }
202
203 #[fixture]
204 fn instrument_id() -> InstrumentId {
205 InstrumentId::from("ESZ24.XCME")
206 }
207
208 #[rstest]
209 fn test_get_custom_topic(mut switchboard: MessagingSwitchboard) {
210 let data_type = DataType::new("ExampleDataType", None);
211 let expected_topic = Ustr::from("data.ExampleDataType");
212 let result = switchboard.get_custom_topic(&data_type);
213 assert_eq!(result, expected_topic);
214 assert!(switchboard.custom_topics.contains_key(&data_type));
215 }
216
217 #[rstest]
218 fn test_get_instrument_topic(
219 mut switchboard: MessagingSwitchboard,
220 instrument_id: InstrumentId,
221 ) {
222 let expected_topic = Ustr::from("data.instrument.XCME.ESZ24");
223 let result = switchboard.get_instrument_topic(instrument_id);
224 assert_eq!(result, expected_topic);
225 assert!(switchboard.instrument_topics.contains_key(&instrument_id));
226 }
227
228 #[rstest]
229 fn test_get_deltas_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
230 let expected_topic = Ustr::from("data.book.deltas.XCME.ESZ24");
231 let result = switchboard.get_deltas_topic(instrument_id);
232 assert_eq!(result, expected_topic);
233 assert!(switchboard.deltas_topics.contains_key(&instrument_id));
234 }
235
236 #[rstest]
237 fn test_get_book_snapshots_topic(
238 mut switchboard: MessagingSwitchboard,
239 instrument_id: InstrumentId,
240 ) {
241 let expected_topic = Ustr::from("data.book.snapshots.XCME.ESZ24");
242 let result = switchboard.get_book_snapshots_topic(instrument_id);
243 assert_eq!(result, expected_topic);
244 assert!(switchboard
245 .book_snapshots_topics
246 .contains_key(&instrument_id));
247 }
248
249 #[rstest]
250 fn test_get_depth_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
251 let expected_topic = Ustr::from("data.book.depth.XCME.ESZ24");
252 let result = switchboard.get_depth_topic(instrument_id);
253 assert_eq!(result, expected_topic);
254 assert!(switchboard.depth_topics.contains_key(&instrument_id));
255 }
256
257 #[rstest]
258 fn test_get_quotes_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
259 let expected_topic = Ustr::from("data.quotes.XCME.ESZ24");
260 let result = switchboard.get_quotes_topic(instrument_id);
261 assert_eq!(result, expected_topic);
262 assert!(switchboard.quote_topics.contains_key(&instrument_id));
263 }
264
265 #[rstest]
266 fn test_get_trades_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
267 let expected_topic = Ustr::from("data.trades.XCME.ESZ24");
268 let result = switchboard.get_trades_topic(instrument_id);
269 assert_eq!(result, expected_topic);
270 assert!(switchboard.trade_topics.contains_key(&instrument_id));
271 }
272
273 #[rstest]
274 fn test_get_bars_topic(mut switchboard: MessagingSwitchboard) {
275 let bar_type = BarType::from("ESZ24.XCME-1-MINUTE-LAST-INTERNAL");
276 let expected_topic = Ustr::from(&format!("data.bars.{bar_type}"));
277 let result = switchboard.get_bars_topic(bar_type);
278 assert_eq!(result, expected_topic);
279 assert!(switchboard.bar_topics.contains_key(&bar_type));
280 }
281
282 #[rstest]
283 fn test_get_order_snapshots_topic(mut switchboard: MessagingSwitchboard) {
284 let client_order_id = ClientOrderId::from("O-123456789");
285 let expected_topic = Ustr::from(&format!("order.snapshots.{client_order_id}"));
286 let result = switchboard.get_order_snapshots_topic(client_order_id);
287 assert_eq!(result, expected_topic);
288 assert!(switchboard
289 .order_snapshots_topics
290 .contains_key(&client_order_id));
291 }
292}