1use std::collections::HashMap;
17
18use nautilus_model::{
19 data::{BarType, DataType},
20 identifiers::{ClientOrderId, InstrumentId, PositionId, StrategyId, Venue},
21};
22use ustr::Ustr;
23
24use crate::msgbus::get_message_bus;
25
26#[must_use]
27pub fn get_custom_topic(data_type: &DataType) -> Ustr {
28 get_message_bus()
29 .borrow_mut()
30 .switchboard
31 .get_custom_topic(data_type)
32}
33
34#[must_use]
35pub fn get_instruments_topic(venue: Venue) -> Ustr {
36 get_message_bus()
37 .borrow_mut()
38 .switchboard
39 .get_instruments_topic(venue)
40}
41
42#[must_use]
43pub fn get_instrument_topic(instrument_id: InstrumentId) -> Ustr {
44 get_message_bus()
45 .borrow_mut()
46 .switchboard
47 .get_instrument_topic(instrument_id)
48}
49
50#[must_use]
51pub fn get_book_deltas_topic(instrument_id: InstrumentId) -> Ustr {
52 get_message_bus()
53 .borrow_mut()
54 .switchboard
55 .get_book_deltas_topic(instrument_id)
56}
57
58#[must_use]
59pub fn get_book_depth10_topic(instrument_id: InstrumentId) -> Ustr {
60 get_message_bus()
61 .borrow_mut()
62 .switchboard
63 .get_book_depth10_topic(instrument_id)
64}
65
66#[must_use]
67pub fn get_book_snapshots_topic(instrument_id: InstrumentId) -> Ustr {
68 get_message_bus()
69 .borrow_mut()
70 .switchboard
71 .get_book_snapshots_topic(instrument_id)
72}
73
74#[must_use]
75pub fn get_quotes_topic(instrument_id: InstrumentId) -> Ustr {
76 get_message_bus()
77 .borrow_mut()
78 .switchboard
79 .get_quotes_topic(instrument_id)
80}
81
82#[must_use]
83pub fn get_trades_topic(instrument_id: InstrumentId) -> Ustr {
84 get_message_bus()
85 .borrow_mut()
86 .switchboard
87 .get_trades_topic(instrument_id)
88}
89
90#[must_use]
91pub fn get_bars_topic(bar_type: BarType) -> Ustr {
92 get_message_bus()
93 .borrow_mut()
94 .switchboard
95 .get_bars_topic(bar_type)
96}
97
98#[must_use]
99pub fn get_mark_price_topic(instrument_id: InstrumentId) -> Ustr {
100 get_message_bus()
101 .borrow_mut()
102 .switchboard
103 .get_mark_price_topic(instrument_id)
104}
105
106#[must_use]
107pub fn get_index_price_topic(instrument_id: InstrumentId) -> Ustr {
108 get_message_bus()
109 .borrow_mut()
110 .switchboard
111 .get_index_price_topic(instrument_id)
112}
113
114#[must_use]
115pub fn get_instrument_status_topic(instrument_id: InstrumentId) -> Ustr {
116 get_message_bus()
117 .borrow_mut()
118 .switchboard
119 .get_instrument_status_topic(instrument_id)
120}
121
122#[must_use]
123pub fn get_instrument_close_topic(instrument_id: InstrumentId) -> Ustr {
124 get_message_bus()
125 .borrow_mut()
126 .switchboard
127 .get_instrument_close_topic(instrument_id)
128}
129
130#[must_use]
131pub fn get_order_snapshots_topic(client_order_id: ClientOrderId) -> Ustr {
132 get_message_bus()
133 .borrow_mut()
134 .switchboard
135 .get_order_snapshots_topic(client_order_id)
136}
137
138#[must_use]
139pub fn get_positions_snapshots_topic(position_id: PositionId) -> Ustr {
140 get_message_bus()
141 .borrow_mut()
142 .switchboard
143 .get_positions_snapshots_topic(position_id)
144}
145
146#[must_use]
147pub fn get_event_orders_topic(strategy_id: StrategyId) -> Ustr {
148 get_message_bus()
149 .borrow_mut()
150 .switchboard
151 .get_event_orders_topic(strategy_id)
152}
153
154#[must_use]
155pub fn get_event_positions_topic(strategy_id: StrategyId) -> Ustr {
156 get_message_bus()
157 .borrow_mut()
158 .switchboard
159 .get_event_positions_topic(strategy_id)
160}
161
162#[derive(Clone, Debug)]
164pub struct MessagingSwitchboard {
165 custom_topics: HashMap<DataType, Ustr>,
166 instruments_topics: HashMap<Venue, Ustr>,
167 instrument_topics: HashMap<InstrumentId, Ustr>,
168 book_deltas_topics: HashMap<InstrumentId, Ustr>,
169 book_depth10_topics: HashMap<InstrumentId, Ustr>,
170 book_snapshots_topics: HashMap<InstrumentId, Ustr>,
171 quote_topics: HashMap<InstrumentId, Ustr>,
172 trade_topics: HashMap<InstrumentId, Ustr>,
173 bar_topics: HashMap<BarType, Ustr>,
174 mark_price_topics: HashMap<InstrumentId, Ustr>,
175 index_price_topics: HashMap<InstrumentId, Ustr>,
176 instrument_status_topics: HashMap<InstrumentId, Ustr>,
177 instrument_close_topics: HashMap<InstrumentId, Ustr>,
178 event_orders_topics: HashMap<StrategyId, Ustr>,
179 event_positions_topics: HashMap<StrategyId, Ustr>,
180 order_snapshots_topics: HashMap<ClientOrderId, Ustr>,
181 positions_snapshots_topics: HashMap<PositionId, Ustr>,
182}
183
184impl Default for MessagingSwitchboard {
185 fn default() -> Self {
187 Self {
188 custom_topics: HashMap::new(),
189 instruments_topics: HashMap::new(),
190 instrument_topics: HashMap::new(),
191 book_deltas_topics: HashMap::new(),
192 book_snapshots_topics: HashMap::new(),
193 book_depth10_topics: HashMap::new(),
194 quote_topics: HashMap::new(),
195 trade_topics: HashMap::new(),
196 mark_price_topics: HashMap::new(),
197 index_price_topics: HashMap::new(),
198 bar_topics: HashMap::new(),
199 instrument_status_topics: HashMap::new(),
200 instrument_close_topics: HashMap::new(),
201 order_snapshots_topics: HashMap::new(),
202 event_orders_topics: HashMap::new(),
203 event_positions_topics: HashMap::new(),
204 positions_snapshots_topics: HashMap::new(),
205 }
206 }
207}
208
209impl MessagingSwitchboard {
210 #[must_use]
211 pub fn data_engine_execute() -> Ustr {
212 Ustr::from("DataEngine.execute")
213 }
214
215 #[must_use]
216 pub fn data_engine_process() -> Ustr {
217 Ustr::from("DataEngine.process")
218 }
219
220 #[must_use]
221 pub fn exec_engine_execute() -> Ustr {
222 Ustr::from("ExecEngine.execute")
223 }
224
225 #[must_use]
226 pub fn exec_engine_process() -> Ustr {
227 Ustr::from("ExecEngine.process")
228 }
229
230 #[must_use]
231 pub fn get_custom_topic(&mut self, data_type: &DataType) -> Ustr {
232 *self
233 .custom_topics
234 .entry(data_type.clone())
235 .or_insert_with(|| Ustr::from(&format!("data.{}", data_type.topic())))
236 }
237
238 #[must_use]
239 pub fn get_instruments_topic(&mut self, venue: Venue) -> Ustr {
240 *self
241 .instruments_topics
242 .entry(venue)
243 .or_insert_with(|| Ustr::from(&format!("data.instrument.{}", venue)))
244 }
245
246 #[must_use]
247 pub fn get_instrument_topic(&mut self, instrument_id: InstrumentId) -> Ustr {
248 *self
249 .instrument_topics
250 .entry(instrument_id)
251 .or_insert_with(|| {
252 Ustr::from(&format!(
253 "data.instrument.{}.{}",
254 instrument_id.venue, instrument_id.symbol
255 ))
256 })
257 }
258
259 #[must_use]
260 pub fn get_book_deltas_topic(&mut self, instrument_id: InstrumentId) -> Ustr {
261 *self
262 .book_deltas_topics
263 .entry(instrument_id)
264 .or_insert_with(|| {
265 Ustr::from(&format!(
266 "data.book.deltas.{}.{}",
267 instrument_id.venue, instrument_id.symbol
268 ))
269 })
270 }
271
272 #[must_use]
273 pub fn get_book_depth10_topic(&mut self, instrument_id: InstrumentId) -> Ustr {
274 *self
275 .book_depth10_topics
276 .entry(instrument_id)
277 .or_insert_with(|| {
278 Ustr::from(&format!(
279 "data.book.depth10.{}.{}",
280 instrument_id.venue, instrument_id.symbol
281 ))
282 })
283 }
284
285 #[must_use]
286 pub fn get_book_snapshots_topic(&mut self, instrument_id: InstrumentId) -> Ustr {
287 *self
288 .book_snapshots_topics
289 .entry(instrument_id)
290 .or_insert_with(|| {
291 Ustr::from(&format!(
292 "data.book.snapshots.{}.{}",
293 instrument_id.venue, instrument_id.symbol
294 ))
295 })
296 }
297
298 #[must_use]
299 pub fn get_quotes_topic(&mut self, instrument_id: InstrumentId) -> Ustr {
300 *self.quote_topics.entry(instrument_id).or_insert_with(|| {
301 Ustr::from(&format!(
302 "data.quotes.{}.{}",
303 instrument_id.venue, instrument_id.symbol
304 ))
305 })
306 }
307
308 #[must_use]
309 pub fn get_trades_topic(&mut self, instrument_id: InstrumentId) -> Ustr {
310 *self.trade_topics.entry(instrument_id).or_insert_with(|| {
311 Ustr::from(&format!(
312 "data.trades.{}.{}",
313 instrument_id.venue, instrument_id.symbol
314 ))
315 })
316 }
317
318 #[must_use]
319 pub fn get_bars_topic(&mut self, bar_type: BarType) -> Ustr {
320 *self
321 .bar_topics
322 .entry(bar_type)
323 .or_insert_with(|| Ustr::from(&format!("data.bars.{bar_type}")))
324 }
325
326 #[must_use]
327 pub fn get_mark_price_topic(&mut self, instrument_id: InstrumentId) -> Ustr {
328 *self
329 .mark_price_topics
330 .entry(instrument_id)
331 .or_insert_with(|| {
332 Ustr::from(&format!(
333 "data.mark_prices.{}.{}",
334 instrument_id.venue, instrument_id.symbol
335 ))
336 })
337 }
338
339 #[must_use]
340 pub fn get_index_price_topic(&mut self, instrument_id: InstrumentId) -> Ustr {
341 *self
342 .index_price_topics
343 .entry(instrument_id)
344 .or_insert_with(|| {
345 Ustr::from(&format!(
346 "data.index_prices.{}.{}",
347 instrument_id.venue, instrument_id.symbol
348 ))
349 })
350 }
351
352 #[must_use]
353 pub fn get_instrument_status_topic(&mut self, instrument_id: InstrumentId) -> Ustr {
354 *self
355 .instrument_status_topics
356 .entry(instrument_id)
357 .or_insert_with(|| {
358 Ustr::from(&format!(
359 "data.status.{}.{}",
360 instrument_id.venue, instrument_id.symbol
361 ))
362 })
363 }
364
365 #[must_use]
366 pub fn get_instrument_close_topic(&mut self, instrument_id: InstrumentId) -> Ustr {
367 *self
368 .instrument_close_topics
369 .entry(instrument_id)
370 .or_insert_with(|| {
371 Ustr::from(&format!(
372 "data.close.{}.{}",
373 instrument_id.venue, instrument_id.symbol
374 ))
375 })
376 }
377
378 #[must_use]
379 pub fn get_order_snapshots_topic(&mut self, client_order_id: ClientOrderId) -> Ustr {
380 *self
381 .order_snapshots_topics
382 .entry(client_order_id)
383 .or_insert_with(|| Ustr::from(&format!("order.snapshots.{client_order_id}")))
384 }
385
386 #[must_use]
387 pub fn get_positions_snapshots_topic(&mut self, position_id: PositionId) -> Ustr {
388 *self
389 .positions_snapshots_topics
390 .entry(position_id)
391 .or_insert_with(|| Ustr::from(&format!("positions.snapshots.{position_id}")))
392 }
393
394 #[must_use]
395 pub fn get_event_orders_topic(&mut self, strategy_id: StrategyId) -> Ustr {
396 *self
397 .event_orders_topics
398 .entry(strategy_id)
399 .or_insert_with(|| Ustr::from(&format!("events.order.{strategy_id}")))
400 }
401
402 #[must_use]
403 pub fn get_event_positions_topic(&mut self, strategy_id: StrategyId) -> Ustr {
404 *self
405 .event_positions_topics
406 .entry(strategy_id)
407 .or_insert_with(|| Ustr::from(&format!("events.position.{strategy_id}")))
408 }
409}
410
411#[cfg(test)]
415mod tests {
416 use nautilus_model::{
417 data::{BarType, DataType},
418 identifiers::InstrumentId,
419 };
420 use rstest::*;
421
422 use super::*;
423
424 #[fixture]
425 fn switchboard() -> MessagingSwitchboard {
426 MessagingSwitchboard::default()
427 }
428
429 #[fixture]
430 fn instrument_id() -> InstrumentId {
431 InstrumentId::from("ESZ24.XCME")
432 }
433
434 #[rstest]
435 fn test_get_custom_topic(mut switchboard: MessagingSwitchboard) {
436 let data_type = DataType::new("ExampleDataType", None);
437 let expected_topic = Ustr::from("data.ExampleDataType");
438 let result = switchboard.get_custom_topic(&data_type);
439 assert_eq!(result, expected_topic);
440 assert!(switchboard.custom_topics.contains_key(&data_type));
441 }
442
443 #[rstest]
444 fn test_get_instrument_topic(
445 mut switchboard: MessagingSwitchboard,
446 instrument_id: InstrumentId,
447 ) {
448 let expected_topic = Ustr::from("data.instrument.XCME.ESZ24");
449 let result = switchboard.get_instrument_topic(instrument_id);
450 assert_eq!(result, expected_topic);
451 assert!(switchboard.instrument_topics.contains_key(&instrument_id));
452 }
453
454 #[rstest]
455 fn test_get_book_deltas_topic(
456 mut switchboard: MessagingSwitchboard,
457 instrument_id: InstrumentId,
458 ) {
459 let expected_topic = Ustr::from("data.book.deltas.XCME.ESZ24");
460 let result = switchboard.get_book_deltas_topic(instrument_id);
461 assert_eq!(result, expected_topic);
462 assert!(switchboard.book_deltas_topics.contains_key(&instrument_id));
463 }
464
465 #[rstest]
466 fn test_get_book_depth10_topic(
467 mut switchboard: MessagingSwitchboard,
468 instrument_id: InstrumentId,
469 ) {
470 let expected_topic = Ustr::from("data.book.depth10.XCME.ESZ24");
471 let result = switchboard.get_book_depth10_topic(instrument_id);
472 assert_eq!(result, expected_topic);
473 assert!(switchboard.book_depth10_topics.contains_key(&instrument_id));
474 }
475
476 #[rstest]
477 fn test_get_book_snapshots_topic(
478 mut switchboard: MessagingSwitchboard,
479 instrument_id: InstrumentId,
480 ) {
481 let expected_topic = Ustr::from("data.book.snapshots.XCME.ESZ24");
482 let result = switchboard.get_book_snapshots_topic(instrument_id);
483 assert_eq!(result, expected_topic);
484 assert!(
485 switchboard
486 .book_snapshots_topics
487 .contains_key(&instrument_id)
488 );
489 }
490
491 #[rstest]
492 fn test_get_quotes_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
493 let expected_topic = Ustr::from("data.quotes.XCME.ESZ24");
494 let result = switchboard.get_quotes_topic(instrument_id);
495 assert_eq!(result, expected_topic);
496 assert!(switchboard.quote_topics.contains_key(&instrument_id));
497 }
498
499 #[rstest]
500 fn test_get_trades_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
501 let expected_topic = Ustr::from("data.trades.XCME.ESZ24");
502 let result = switchboard.get_trades_topic(instrument_id);
503 assert_eq!(result, expected_topic);
504 assert!(switchboard.trade_topics.contains_key(&instrument_id));
505 }
506
507 #[rstest]
508 fn test_get_bars_topic(mut switchboard: MessagingSwitchboard) {
509 let bar_type = BarType::from("ESZ24.XCME-1-MINUTE-LAST-INTERNAL");
510 let expected_topic = Ustr::from(&format!("data.bars.{bar_type}"));
511 let result = switchboard.get_bars_topic(bar_type);
512 assert_eq!(result, expected_topic);
513 assert!(switchboard.bar_topics.contains_key(&bar_type));
514 }
515
516 #[rstest]
517 fn test_get_order_snapshots_topic(mut switchboard: MessagingSwitchboard) {
518 let client_order_id = ClientOrderId::from("O-123456789");
519 let expected_topic = Ustr::from(&format!("order.snapshots.{client_order_id}"));
520 let result = switchboard.get_order_snapshots_topic(client_order_id);
521 assert_eq!(result, expected_topic);
522 assert!(
523 switchboard
524 .order_snapshots_topics
525 .contains_key(&client_order_id)
526 );
527 }
528}