1use std::{num::NonZeroUsize, sync::OnceLock};
17
18use ahash::AHashMap;
19use nautilus_model::{
20 data::{BarType, DataType},
21 identifiers::{ClientOrderId, InstrumentId, PositionId, StrategyId, Venue},
22};
23
24use super::core::{Endpoint, MStr, Topic};
25use crate::msgbus::get_message_bus;
26
27pub const CLOSE_TOPIC: &str = "CLOSE";
28
29static DATA_QUEUE_EXECUTE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
36static DATA_EXECUTE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
37static DATA_PROCESS_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
38static DATA_RESPONSE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
39static EXEC_EXECUTE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
40static EXEC_PROCESS_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
41static EXEC_RECONCILE_REPORT_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
42static EXEC_RECONCILE_MASS_STATUS_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
43static RISK_EXECUTE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
44static RISK_PROCESS_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
45static PORTFOLIO_UPDATE_ACCOUNT_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
46
47macro_rules! define_switchboard {
48 ($(
49 $field:ident: $key_ty:ty,
50 $method:ident($($arg_name:ident: $arg_ty:ty),*) -> $key_expr:expr,
51 $val_fmt:expr,
52 $($val_args:expr),*
53 );* $(;)?) => {
54 #[derive(Clone, Debug)]
56 pub struct MessagingSwitchboard {
57 $(
58 $field: AHashMap<$key_ty, MStr<Topic>>,
59 )*
60 #[cfg(feature = "defi")]
61 pub(crate) defi: crate::defi::switchboard::DefiSwitchboard,
62 }
63
64 impl Default for MessagingSwitchboard {
65 fn default() -> Self {
67 Self {
68 $(
69 $field: AHashMap::new(),
70 )*
71 #[cfg(feature = "defi")]
72 defi: crate::defi::switchboard::DefiSwitchboard::default(),
73 }
74 }
75 }
76
77 impl MessagingSwitchboard {
78 #[inline]
80 #[must_use]
81 pub fn data_engine_queue_execute() -> MStr<Endpoint> {
82 *DATA_QUEUE_EXECUTE_ENDPOINT.get_or_init(|| "DataEngine.queue_execute".into())
83 }
84
85 #[inline]
86 #[must_use]
87 pub fn data_engine_execute() -> MStr<Endpoint> {
88 *DATA_EXECUTE_ENDPOINT.get_or_init(|| "DataEngine.execute".into())
89 }
90
91 #[inline]
92 #[must_use]
93 pub fn data_engine_process() -> MStr<Endpoint> {
94 *DATA_PROCESS_ENDPOINT.get_or_init(|| "DataEngine.process".into())
95 }
96
97 #[inline]
98 #[must_use]
99 pub fn data_engine_response() -> MStr<Endpoint> {
100 *DATA_RESPONSE_ENDPOINT.get_or_init(|| "DataEngine.response".into())
101 }
102
103 #[inline]
104 #[must_use]
105 pub fn exec_engine_execute() -> MStr<Endpoint> {
106 *EXEC_EXECUTE_ENDPOINT.get_or_init(|| "ExecEngine.execute".into())
107 }
108
109 #[inline]
110 #[must_use]
111 pub fn exec_engine_process() -> MStr<Endpoint> {
112 *EXEC_PROCESS_ENDPOINT.get_or_init(|| "ExecEngine.process".into())
113 }
114
115 #[inline]
116 #[must_use]
117 pub fn exec_engine_reconcile_execution_report() -> MStr<Endpoint> {
118 *EXEC_RECONCILE_REPORT_ENDPOINT.get_or_init(|| "ExecEngine.reconcile_execution_report".into())
119 }
120
121 #[inline]
122 #[must_use]
123 pub fn exec_engine_reconcile_execution_mass_status() -> MStr<Endpoint> {
124 *EXEC_RECONCILE_MASS_STATUS_ENDPOINT
125 .get_or_init(|| "ExecEngine.reconcile_execution_mass_status".into())
126 }
127
128 #[inline]
129 #[must_use]
130 pub fn risk_engine_execute() -> MStr<Endpoint> {
131 *RISK_EXECUTE_ENDPOINT.get_or_init(|| "RiskEngine.execute".into())
132 }
133
134 #[inline]
135 #[must_use]
136 pub fn risk_engine_process() -> MStr<Endpoint> {
137 *RISK_PROCESS_ENDPOINT.get_or_init(|| "RiskEngine.process".into())
138 }
139
140 #[inline]
141 #[must_use]
142 pub fn portfolio_update_account() -> MStr<Endpoint> {
143 *PORTFOLIO_UPDATE_ACCOUNT_ENDPOINT.get_or_init(|| "Portfolio.update_account".into())
144 }
145
146 $(
148 #[must_use]
149 pub fn $method(&mut self, $($arg_name: $arg_ty),*) -> MStr<Topic> {
150 let key = $key_expr;
151 *self.$field
152 .entry(key)
153 .or_insert_with(|| format!($val_fmt, $($val_args),*).into())
154 }
155 )*
156 }
157 };
158}
159
160define_switchboard! {
161 custom_topics: DataType,
162 get_custom_topic(data_type: &DataType) -> data_type.clone(),
163 "data.{}", data_type.topic();
164
165 instruments_topics: Venue,
166 get_instruments_topic(venue: Venue) -> venue,
167 "data.instrument.{}", venue;
168
169 instrument_topics: InstrumentId,
170 get_instrument_topic(instrument_id: InstrumentId) -> instrument_id,
171 "data.instrument.{}.{}", instrument_id.venue, instrument_id.symbol;
172
173 book_deltas_topics: InstrumentId,
174 get_book_deltas_topic(instrument_id: InstrumentId) -> instrument_id,
175 "data.book.deltas.{}.{}", instrument_id.venue, instrument_id.symbol;
176
177 book_depth10_topics: InstrumentId,
178 get_book_depth10_topic(instrument_id: InstrumentId) -> instrument_id,
179 "data.book.depth10.{}.{}", instrument_id.venue, instrument_id.symbol;
180
181 book_snapshots_topics: (InstrumentId, NonZeroUsize),
182 get_book_snapshots_topic(instrument_id: InstrumentId, interval_ms: NonZeroUsize) -> (instrument_id, interval_ms),
183 "data.book.snapshots.{}.{}.{}", instrument_id.venue, instrument_id.symbol, interval_ms;
184
185 quote_topics: InstrumentId,
186 get_quotes_topic(instrument_id: InstrumentId) -> instrument_id,
187 "data.quotes.{}.{}", instrument_id.venue, instrument_id.symbol;
188
189 trade_topics: InstrumentId,
190 get_trades_topic(instrument_id: InstrumentId) -> instrument_id,
191 "data.trades.{}.{}", instrument_id.venue, instrument_id.symbol;
192
193 bar_topics: BarType,
194 get_bars_topic(bar_type: BarType) -> bar_type,
195 "data.bars.{}", bar_type;
196
197 mark_price_topics: InstrumentId,
198 get_mark_price_topic(instrument_id: InstrumentId) -> instrument_id,
199 "data.mark_prices.{}.{}", instrument_id.venue, instrument_id.symbol;
200
201 index_price_topics: InstrumentId,
202 get_index_price_topic(instrument_id: InstrumentId) -> instrument_id,
203 "data.index_prices.{}.{}", instrument_id.venue, instrument_id.symbol;
204
205 funding_rate_topics: InstrumentId,
206 get_funding_rate_topic(instrument_id: InstrumentId) -> instrument_id,
207 "data.funding_rates.{}.{}", instrument_id.venue, instrument_id.symbol;
208
209 instrument_status_topics: InstrumentId,
210 get_instrument_status_topic(instrument_id: InstrumentId) -> instrument_id,
211 "data.status.{}.{}", instrument_id.venue, instrument_id.symbol;
212
213 instrument_close_topics: InstrumentId,
214 get_instrument_close_topic(instrument_id: InstrumentId) -> instrument_id,
215 "data.close.{}.{}", instrument_id.venue, instrument_id.symbol;
216
217 order_fills_topics: InstrumentId,
218 get_order_fills_topic(instrument_id: InstrumentId) -> instrument_id,
219 "events.fills.{}", instrument_id;
220
221 order_cancels_topics: InstrumentId,
222 get_order_cancels_topic(instrument_id: InstrumentId) -> instrument_id,
223 "events.cancels.{}", instrument_id;
224
225 order_snapshots_topics: ClientOrderId,
226 get_order_snapshots_topic(client_order_id: ClientOrderId) -> client_order_id,
227 "order.snapshots.{}", client_order_id;
228
229 positions_snapshots_topics: PositionId,
230 get_positions_snapshots_topic(position_id: PositionId) -> position_id,
231 "positions.snapshots.{}", position_id;
232
233 event_orders_topics: StrategyId,
234 get_event_orders_topic(strategy_id: StrategyId) -> strategy_id,
235 "events.order.{}", strategy_id;
236
237 event_positions_topics: StrategyId,
238 get_event_positions_topic(strategy_id: StrategyId) -> strategy_id,
239 "events.position.{}", strategy_id;
240}
241
242macro_rules! define_wrappers {
249 ($($method:ident($($arg_name:ident: $arg_ty:ty),*) -> $ret:ty),* $(,)?) => {
250 $(
251 #[must_use]
252 pub fn $method($($arg_name: $arg_ty),*) -> $ret {
253 get_message_bus()
254 .borrow_mut()
255 .switchboard
256 .$method($($arg_name),*)
257 }
258 )*
259 }
260}
261
262define_wrappers! {
263 get_custom_topic(data_type: &DataType) -> MStr<Topic>,
264 get_instruments_topic(venue: Venue) -> MStr<Topic>,
265 get_instrument_topic(instrument_id: InstrumentId) -> MStr<Topic>,
266 get_book_deltas_topic(instrument_id: InstrumentId) -> MStr<Topic>,
267 get_book_depth10_topic(instrument_id: InstrumentId) -> MStr<Topic>,
268 get_book_snapshots_topic(instrument_id: InstrumentId, interval_ms: NonZeroUsize) -> MStr<Topic>,
269 get_quotes_topic(instrument_id: InstrumentId) -> MStr<Topic>,
270 get_trades_topic(instrument_id: InstrumentId) -> MStr<Topic>,
271 get_bars_topic(bar_type: BarType) -> MStr<Topic>,
272 get_mark_price_topic(instrument_id: InstrumentId) -> MStr<Topic>,
273 get_index_price_topic(instrument_id: InstrumentId) -> MStr<Topic>,
274 get_funding_rate_topic(instrument_id: InstrumentId) -> MStr<Topic>,
275 get_instrument_status_topic(instrument_id: InstrumentId) -> MStr<Topic>,
276 get_instrument_close_topic(instrument_id: InstrumentId) -> MStr<Topic>,
277 get_order_fills_topic(instrument_id: InstrumentId) -> MStr<Topic>,
278 get_order_cancels_topic(instrument_id: InstrumentId) -> MStr<Topic>,
279 get_order_snapshots_topic(client_order_id: ClientOrderId) -> MStr<Topic>,
280 get_positions_snapshots_topic(position_id: PositionId) -> MStr<Topic>,
281 get_event_orders_topic(strategy_id: StrategyId) -> MStr<Topic>,
282 get_event_positions_topic(strategy_id: StrategyId) -> MStr<Topic>,
283}
284
285#[cfg(test)]
286mod tests {
287 use nautilus_model::{
288 data::{BarType, DataType},
289 identifiers::InstrumentId,
290 };
291 use rstest::*;
292
293 use super::*;
294
295 #[fixture]
296 fn switchboard() -> MessagingSwitchboard {
297 MessagingSwitchboard::default()
298 }
299
300 #[fixture]
301 fn instrument_id() -> InstrumentId {
302 InstrumentId::from("ESZ24.XCME")
303 }
304
305 #[rstest]
306 fn test_get_custom_topic(mut switchboard: MessagingSwitchboard) {
307 let data_type = DataType::new("ExampleDataType", None);
308 let expected_topic = "data.ExampleDataType".into();
309 let result = switchboard.get_custom_topic(&data_type);
310 assert_eq!(result, expected_topic);
311 assert!(switchboard.custom_topics.contains_key(&data_type));
312 }
313
314 #[rstest]
315 fn test_get_instrument_topic(
316 mut switchboard: MessagingSwitchboard,
317 instrument_id: InstrumentId,
318 ) {
319 let expected_topic = "data.instrument.XCME.ESZ24".into();
320 let result = switchboard.get_instrument_topic(instrument_id);
321 assert_eq!(result, expected_topic);
322 assert!(switchboard.instrument_topics.contains_key(&instrument_id));
323 }
324
325 #[rstest]
326 fn test_get_book_deltas_topic(
327 mut switchboard: MessagingSwitchboard,
328 instrument_id: InstrumentId,
329 ) {
330 let expected_topic = "data.book.deltas.XCME.ESZ24".into();
331 let result = switchboard.get_book_deltas_topic(instrument_id);
332 assert_eq!(result, expected_topic);
333 assert!(switchboard.book_deltas_topics.contains_key(&instrument_id));
334 }
335
336 #[rstest]
337 fn test_get_book_depth10_topic(
338 mut switchboard: MessagingSwitchboard,
339 instrument_id: InstrumentId,
340 ) {
341 let expected_topic = "data.book.depth10.XCME.ESZ24".into();
342 let result = switchboard.get_book_depth10_topic(instrument_id);
343 assert_eq!(result, expected_topic);
344 assert!(switchboard.book_depth10_topics.contains_key(&instrument_id));
345 }
346
347 #[rstest]
348 fn test_get_book_snapshots_topic(
349 mut switchboard: MessagingSwitchboard,
350 instrument_id: InstrumentId,
351 ) {
352 let expected_topic = "data.book.snapshots.XCME.ESZ24.1000".into();
353 let interval_ms = NonZeroUsize::new(1000).unwrap();
354 let result = switchboard.get_book_snapshots_topic(instrument_id, interval_ms);
355 assert_eq!(result, expected_topic);
356
357 assert!(
358 switchboard
359 .book_snapshots_topics
360 .contains_key(&(instrument_id, interval_ms))
361 );
362 }
363
364 #[rstest]
365 fn test_get_quotes_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
366 let expected_topic = "data.quotes.XCME.ESZ24".into();
367 let result = switchboard.get_quotes_topic(instrument_id);
368 assert_eq!(result, expected_topic);
369 assert!(switchboard.quote_topics.contains_key(&instrument_id));
370 }
371
372 #[rstest]
373 fn test_get_trades_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
374 let expected_topic = "data.trades.XCME.ESZ24".into();
375 let result = switchboard.get_trades_topic(instrument_id);
376 assert_eq!(result, expected_topic);
377 assert!(switchboard.trade_topics.contains_key(&instrument_id));
378 }
379
380 #[rstest]
381 fn test_get_bars_topic(mut switchboard: MessagingSwitchboard) {
382 let bar_type = BarType::from("ESZ24.XCME-1-MINUTE-LAST-INTERNAL");
383 let expected_topic = format!("data.bars.{bar_type}").into();
384 let result = switchboard.get_bars_topic(bar_type);
385 assert_eq!(result, expected_topic);
386 assert!(switchboard.bar_topics.contains_key(&bar_type));
387 }
388
389 #[rstest]
390 fn test_get_order_snapshots_topic(mut switchboard: MessagingSwitchboard) {
391 let client_order_id = ClientOrderId::from("O-123456789");
392 let expected_topic = format!("order.snapshots.{client_order_id}").into();
393 let result = switchboard.get_order_snapshots_topic(client_order_id);
394 assert_eq!(result, expected_topic);
395 assert!(
396 switchboard
397 .order_snapshots_topics
398 .contains_key(&client_order_id)
399 );
400 }
401}