1use std::num::NonZeroUsize;
17
18use ahash::AHashMap;
19use nautilus_model::{
20 data::{BarType, DataType},
21 identifiers::{ClientOrderId, InstrumentId, PositionId, StrategyId, Venue},
22};
23
24use super::core::{Endpoint, MStr, Topic};
25use crate::msgbus::get_message_bus;
26
27pub const CLOSE_TOPIC: &str = "CLOSE";
28
29#[must_use]
30pub fn get_custom_topic(data_type: &DataType) -> MStr<Topic> {
31 get_message_bus()
32 .borrow_mut()
33 .switchboard
34 .get_custom_topic(data_type)
35}
36
37#[must_use]
38pub fn get_instruments_topic(venue: Venue) -> MStr<Topic> {
39 get_message_bus()
40 .borrow_mut()
41 .switchboard
42 .get_instruments_topic(venue)
43}
44
45#[must_use]
46pub fn get_instrument_topic(instrument_id: InstrumentId) -> MStr<Topic> {
47 get_message_bus()
48 .borrow_mut()
49 .switchboard
50 .get_instrument_topic(instrument_id)
51}
52
53#[must_use]
54pub fn get_book_deltas_topic(instrument_id: InstrumentId) -> MStr<Topic> {
55 get_message_bus()
56 .borrow_mut()
57 .switchboard
58 .get_book_deltas_topic(instrument_id)
59}
60
61#[must_use]
62pub fn get_book_depth10_topic(instrument_id: InstrumentId) -> MStr<Topic> {
63 get_message_bus()
64 .borrow_mut()
65 .switchboard
66 .get_book_depth10_topic(instrument_id)
67}
68
69#[must_use]
70pub fn get_book_snapshots_topic(
71 instrument_id: InstrumentId,
72 interval_ms: NonZeroUsize,
73) -> MStr<Topic> {
74 get_message_bus()
75 .borrow_mut()
76 .switchboard
77 .get_book_snapshots_topic(instrument_id, interval_ms)
78}
79
80#[must_use]
81pub fn get_quotes_topic(instrument_id: InstrumentId) -> MStr<Topic> {
82 get_message_bus()
83 .borrow_mut()
84 .switchboard
85 .get_quotes_topic(instrument_id)
86}
87
88#[must_use]
89pub fn get_trades_topic(instrument_id: InstrumentId) -> MStr<Topic> {
90 get_message_bus()
91 .borrow_mut()
92 .switchboard
93 .get_trades_topic(instrument_id)
94}
95
96#[must_use]
97pub fn get_bars_topic(bar_type: BarType) -> MStr<Topic> {
98 get_message_bus()
99 .borrow_mut()
100 .switchboard
101 .get_bars_topic(bar_type)
102}
103
104#[must_use]
105pub fn get_mark_price_topic(instrument_id: InstrumentId) -> MStr<Topic> {
106 get_message_bus()
107 .borrow_mut()
108 .switchboard
109 .get_mark_price_topic(instrument_id)
110}
111
112#[must_use]
113pub fn get_index_price_topic(instrument_id: InstrumentId) -> MStr<Topic> {
114 get_message_bus()
115 .borrow_mut()
116 .switchboard
117 .get_index_price_topic(instrument_id)
118}
119
120#[must_use]
121pub fn get_funding_rate_topic(instrument_id: InstrumentId) -> MStr<Topic> {
122 get_message_bus()
123 .borrow_mut()
124 .switchboard
125 .get_funding_rate_topic(instrument_id)
126}
127
128#[must_use]
129pub fn get_instrument_status_topic(instrument_id: InstrumentId) -> MStr<Topic> {
130 get_message_bus()
131 .borrow_mut()
132 .switchboard
133 .get_instrument_status_topic(instrument_id)
134}
135
136#[must_use]
137pub fn get_instrument_close_topic(instrument_id: InstrumentId) -> MStr<Topic> {
138 get_message_bus()
139 .borrow_mut()
140 .switchboard
141 .get_instrument_close_topic(instrument_id)
142}
143
144#[must_use]
145pub fn get_order_fills_topic(instrument_id: InstrumentId) -> MStr<Topic> {
146 get_message_bus()
147 .borrow_mut()
148 .switchboard
149 .get_order_fills_topic(instrument_id)
150}
151
152#[must_use]
153pub fn get_order_snapshots_topic(client_order_id: ClientOrderId) -> MStr<Topic> {
154 get_message_bus()
155 .borrow_mut()
156 .switchboard
157 .get_order_snapshots_topic(client_order_id)
158}
159
160#[must_use]
161pub fn get_positions_snapshots_topic(position_id: PositionId) -> MStr<Topic> {
162 get_message_bus()
163 .borrow_mut()
164 .switchboard
165 .get_positions_snapshots_topic(position_id)
166}
167
168#[must_use]
169pub fn get_event_orders_topic(strategy_id: StrategyId) -> MStr<Topic> {
170 get_message_bus()
171 .borrow_mut()
172 .switchboard
173 .get_event_orders_topic(strategy_id)
174}
175
176#[must_use]
177pub fn get_event_positions_topic(strategy_id: StrategyId) -> MStr<Topic> {
178 get_message_bus()
179 .borrow_mut()
180 .switchboard
181 .get_event_positions_topic(strategy_id)
182}
183
184#[derive(Clone, Debug)]
186pub struct MessagingSwitchboard {
187 custom_topics: AHashMap<DataType, MStr<Topic>>,
188 instruments_topics: AHashMap<Venue, MStr<Topic>>,
189 instrument_topics: AHashMap<InstrumentId, MStr<Topic>>,
190 book_deltas_topics: AHashMap<InstrumentId, MStr<Topic>>,
191 book_depth10_topics: AHashMap<InstrumentId, MStr<Topic>>,
192 book_snapshots_topics: AHashMap<InstrumentId, MStr<Topic>>,
193 quote_topics: AHashMap<InstrumentId, MStr<Topic>>,
194 trade_topics: AHashMap<InstrumentId, MStr<Topic>>,
195 bar_topics: AHashMap<BarType, MStr<Topic>>,
196 mark_price_topics: AHashMap<InstrumentId, MStr<Topic>>,
197 index_price_topics: AHashMap<InstrumentId, MStr<Topic>>,
198 funding_rate_topics: AHashMap<InstrumentId, MStr<Topic>>,
199 instrument_status_topics: AHashMap<InstrumentId, MStr<Topic>>,
200 instrument_close_topics: AHashMap<InstrumentId, MStr<Topic>>,
201 order_fills_topics: AHashMap<InstrumentId, MStr<Topic>>,
202 event_orders_topics: AHashMap<StrategyId, MStr<Topic>>,
203 event_positions_topics: AHashMap<StrategyId, MStr<Topic>>,
204 order_snapshots_topics: AHashMap<ClientOrderId, MStr<Topic>>,
205 positions_snapshots_topics: AHashMap<PositionId, MStr<Topic>>,
206 #[cfg(feature = "defi")]
207 pub(crate) defi: crate::defi::switchboard::DefiSwitchboard,
208}
209
210impl Default for MessagingSwitchboard {
211 fn default() -> Self {
213 Self {
214 custom_topics: AHashMap::new(),
215 instruments_topics: AHashMap::new(),
216 instrument_topics: AHashMap::new(),
217 book_deltas_topics: AHashMap::new(),
218 book_snapshots_topics: AHashMap::new(),
219 book_depth10_topics: AHashMap::new(),
220 quote_topics: AHashMap::new(),
221 trade_topics: AHashMap::new(),
222 mark_price_topics: AHashMap::new(),
223 index_price_topics: AHashMap::new(),
224 funding_rate_topics: AHashMap::new(),
225 bar_topics: AHashMap::new(),
226 instrument_status_topics: AHashMap::new(),
227 instrument_close_topics: AHashMap::new(),
228 order_fills_topics: AHashMap::new(),
229 order_snapshots_topics: AHashMap::new(),
230 event_orders_topics: AHashMap::new(),
231 event_positions_topics: AHashMap::new(),
232 positions_snapshots_topics: AHashMap::new(),
233 #[cfg(feature = "defi")]
234 defi: crate::defi::switchboard::DefiSwitchboard::default(),
235 }
236 }
237}
238
239impl MessagingSwitchboard {
240 #[must_use]
241 pub fn data_engine_queue_execute() -> MStr<Endpoint> {
242 "DataEngine.queue_execute".into()
243 }
244
245 #[must_use]
246 pub fn data_engine_execute() -> MStr<Endpoint> {
247 "DataEngine.execute".into()
248 }
249
250 #[must_use]
251 pub fn data_engine_process() -> MStr<Endpoint> {
252 "DataEngine.process".into()
253 }
254
255 #[must_use]
256 pub fn data_engine_response() -> MStr<Endpoint> {
257 "DataEngine.response".into()
258 }
259
260 #[must_use]
261 pub fn exec_engine_execute() -> MStr<Endpoint> {
262 "ExecEngine.execute".into()
263 }
264
265 #[must_use]
266 pub fn exec_engine_process() -> MStr<Endpoint> {
267 "ExecEngine.process".into()
268 }
269
270 #[must_use]
271 pub fn get_custom_topic(&mut self, data_type: &DataType) -> MStr<Topic> {
272 *self
273 .custom_topics
274 .entry(data_type.clone())
275 .or_insert_with(|| format!("data.{}", data_type.topic()).into())
276 }
277
278 #[must_use]
279 pub fn get_instruments_topic(&mut self, venue: Venue) -> MStr<Topic> {
280 *self
281 .instruments_topics
282 .entry(venue)
283 .or_insert_with(|| format!("data.instrument.{venue}").into())
284 }
285
286 #[must_use]
287 pub fn get_instrument_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
288 *self
289 .instrument_topics
290 .entry(instrument_id)
291 .or_insert_with(|| {
292 format!(
293 "data.instrument.{}.{}",
294 instrument_id.venue, instrument_id.symbol
295 )
296 .into()
297 })
298 }
299
300 #[must_use]
301 pub fn get_book_deltas_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
302 *self
303 .book_deltas_topics
304 .entry(instrument_id)
305 .or_insert_with(|| {
306 format!(
307 "data.book.deltas.{}.{}",
308 instrument_id.venue, instrument_id.symbol
309 )
310 .into()
311 })
312 }
313
314 #[must_use]
315 pub fn get_book_depth10_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
316 *self
317 .book_depth10_topics
318 .entry(instrument_id)
319 .or_insert_with(|| {
320 format!(
321 "data.book.depth10.{}.{}",
322 instrument_id.venue, instrument_id.symbol
323 )
324 .into()
325 })
326 }
327
328 #[must_use]
329 pub fn get_book_snapshots_topic(
330 &mut self,
331 instrument_id: InstrumentId,
332 interval_ms: NonZeroUsize,
333 ) -> MStr<Topic> {
334 *self
335 .book_snapshots_topics
336 .entry(instrument_id)
337 .or_insert_with(|| {
338 format!(
339 "data.book.snapshots.{}.{}.{}",
340 instrument_id.venue, instrument_id.symbol, interval_ms
341 )
342 .into()
343 })
344 }
345
346 #[must_use]
347 pub fn get_quotes_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
348 *self.quote_topics.entry(instrument_id).or_insert_with(|| {
349 format!(
350 "data.quotes.{}.{}",
351 instrument_id.venue, instrument_id.symbol
352 )
353 .into()
354 })
355 }
356
357 #[must_use]
358 pub fn get_trades_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
359 *self.trade_topics.entry(instrument_id).or_insert_with(|| {
360 format!(
361 "data.trades.{}.{}",
362 instrument_id.venue, instrument_id.symbol
363 )
364 .into()
365 })
366 }
367
368 #[must_use]
369 pub fn get_bars_topic(&mut self, bar_type: BarType) -> MStr<Topic> {
370 *self
371 .bar_topics
372 .entry(bar_type)
373 .or_insert_with(|| format!("data.bars.{bar_type}").into())
374 }
375
376 #[must_use]
377 pub fn get_mark_price_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
378 *self
379 .mark_price_topics
380 .entry(instrument_id)
381 .or_insert_with(|| {
382 format!(
383 "data.mark_prices.{}.{}",
384 instrument_id.venue, instrument_id.symbol
385 )
386 .into()
387 })
388 }
389
390 #[must_use]
391 pub fn get_index_price_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
392 *self
393 .index_price_topics
394 .entry(instrument_id)
395 .or_insert_with(|| {
396 format!(
397 "data.index_prices.{}.{}",
398 instrument_id.venue, instrument_id.symbol
399 )
400 .into()
401 })
402 }
403
404 pub fn get_funding_rate_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
405 *self
406 .funding_rate_topics
407 .entry(instrument_id)
408 .or_insert_with(|| {
409 format!(
410 "data.funding_rates.{}.{}",
411 instrument_id.venue, instrument_id.symbol
412 )
413 .into()
414 })
415 }
416
417 #[must_use]
418 pub fn get_instrument_status_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
419 *self
420 .instrument_status_topics
421 .entry(instrument_id)
422 .or_insert_with(|| {
423 format!(
424 "data.status.{}.{}",
425 instrument_id.venue, instrument_id.symbol
426 )
427 .into()
428 })
429 }
430
431 #[must_use]
432 pub fn get_instrument_close_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
433 *self
434 .instrument_close_topics
435 .entry(instrument_id)
436 .or_insert_with(|| {
437 format!(
438 "data.close.{}.{}",
439 instrument_id.venue, instrument_id.symbol
440 )
441 .into()
442 })
443 }
444
445 #[must_use]
446 pub fn get_order_fills_topic(&mut self, instrument_id: InstrumentId) -> MStr<Topic> {
447 *self
448 .order_fills_topics
449 .entry(instrument_id)
450 .or_insert_with(|| format!("events.fills.{instrument_id}").into())
451 }
452
453 #[must_use]
454 pub fn get_order_snapshots_topic(&mut self, client_order_id: ClientOrderId) -> MStr<Topic> {
455 *self
456 .order_snapshots_topics
457 .entry(client_order_id)
458 .or_insert_with(|| format!("order.snapshots.{client_order_id}").into())
459 }
460
461 #[must_use]
462 pub fn get_positions_snapshots_topic(&mut self, position_id: PositionId) -> MStr<Topic> {
463 *self
464 .positions_snapshots_topics
465 .entry(position_id)
466 .or_insert_with(|| format!("positions.snapshots.{position_id}").into())
467 }
468
469 #[must_use]
470 pub fn get_event_orders_topic(&mut self, strategy_id: StrategyId) -> MStr<Topic> {
471 *self
472 .event_orders_topics
473 .entry(strategy_id)
474 .or_insert_with(|| format!("events.order.{strategy_id}").into())
475 }
476
477 #[must_use]
478 pub fn get_event_positions_topic(&mut self, strategy_id: StrategyId) -> MStr<Topic> {
479 *self
480 .event_positions_topics
481 .entry(strategy_id)
482 .or_insert_with(|| format!("events.position.{strategy_id}").into())
483 }
484}
485
486#[cfg(test)]
490mod tests {
491 use nautilus_model::{
492 data::{BarType, DataType},
493 identifiers::InstrumentId,
494 };
495 use rstest::*;
496
497 use super::*;
498
499 #[fixture]
500 fn switchboard() -> MessagingSwitchboard {
501 MessagingSwitchboard::default()
502 }
503
504 #[fixture]
505 fn instrument_id() -> InstrumentId {
506 InstrumentId::from("ESZ24.XCME")
507 }
508
509 #[rstest]
510 fn test_get_custom_topic(mut switchboard: MessagingSwitchboard) {
511 let data_type = DataType::new("ExampleDataType", None);
512 let expected_topic = "data.ExampleDataType".into();
513 let result = switchboard.get_custom_topic(&data_type);
514 assert_eq!(result, expected_topic);
515 assert!(switchboard.custom_topics.contains_key(&data_type));
516 }
517
518 #[rstest]
519 fn test_get_instrument_topic(
520 mut switchboard: MessagingSwitchboard,
521 instrument_id: InstrumentId,
522 ) {
523 let expected_topic = "data.instrument.XCME.ESZ24".into();
524 let result = switchboard.get_instrument_topic(instrument_id);
525 assert_eq!(result, expected_topic);
526 assert!(switchboard.instrument_topics.contains_key(&instrument_id));
527 }
528
529 #[rstest]
530 fn test_get_book_deltas_topic(
531 mut switchboard: MessagingSwitchboard,
532 instrument_id: InstrumentId,
533 ) {
534 let expected_topic = "data.book.deltas.XCME.ESZ24".into();
535 let result = switchboard.get_book_deltas_topic(instrument_id);
536 assert_eq!(result, expected_topic);
537 assert!(switchboard.book_deltas_topics.contains_key(&instrument_id));
538 }
539
540 #[rstest]
541 fn test_get_book_depth10_topic(
542 mut switchboard: MessagingSwitchboard,
543 instrument_id: InstrumentId,
544 ) {
545 let expected_topic = "data.book.depth10.XCME.ESZ24".into();
546 let result = switchboard.get_book_depth10_topic(instrument_id);
547 assert_eq!(result, expected_topic);
548 assert!(switchboard.book_depth10_topics.contains_key(&instrument_id));
549 }
550
551 #[rstest]
552 fn test_get_book_snapshots_topic(
553 mut switchboard: MessagingSwitchboard,
554 instrument_id: InstrumentId,
555 ) {
556 let expected_topic = "data.book.snapshots.XCME.ESZ24.1000".into();
557 let interval_ms = NonZeroUsize::new(1000).unwrap();
558 let result = switchboard.get_book_snapshots_topic(instrument_id, interval_ms);
559 assert_eq!(result, expected_topic);
560 assert!(
561 switchboard
562 .book_snapshots_topics
563 .contains_key(&instrument_id)
564 );
565 }
566
567 #[rstest]
568 fn test_get_quotes_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
569 let expected_topic = "data.quotes.XCME.ESZ24".into();
570 let result = switchboard.get_quotes_topic(instrument_id);
571 assert_eq!(result, expected_topic);
572 assert!(switchboard.quote_topics.contains_key(&instrument_id));
573 }
574
575 #[rstest]
576 fn test_get_trades_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
577 let expected_topic = "data.trades.XCME.ESZ24".into();
578 let result = switchboard.get_trades_topic(instrument_id);
579 assert_eq!(result, expected_topic);
580 assert!(switchboard.trade_topics.contains_key(&instrument_id));
581 }
582
583 #[rstest]
584 fn test_get_bars_topic(mut switchboard: MessagingSwitchboard) {
585 let bar_type = BarType::from("ESZ24.XCME-1-MINUTE-LAST-INTERNAL");
586 let expected_topic = format!("data.bars.{bar_type}").into();
587 let result = switchboard.get_bars_topic(bar_type);
588 assert_eq!(result, expected_topic);
589 assert!(switchboard.bar_topics.contains_key(&bar_type));
590 }
591
592 #[rstest]
593 fn test_get_order_snapshots_topic(mut switchboard: MessagingSwitchboard) {
594 let client_order_id = ClientOrderId::from("O-123456789");
595 let expected_topic = format!("order.snapshots.{client_order_id}").into();
596 let result = switchboard.get_order_snapshots_topic(client_order_id);
597 assert_eq!(result, expected_topic);
598 assert!(
599 switchboard
600 .order_snapshots_topics
601 .contains_key(&client_order_id)
602 );
603 }
604}