1use std::{num::NonZeroUsize, sync::OnceLock};
17
18use ahash::AHashMap;
19use nautilus_model::{
20 data::{BarType, DataType},
21 identifiers::{ClientOrderId, InstrumentId, PositionId, StrategyId, Venue},
22};
23
24use super::mstr::{Endpoint, MStr, Topic};
25use crate::msgbus::get_message_bus;
26
27pub const CLOSE_TOPIC: &str = "CLOSE";
28
29static DATA_QUEUE_COMMAND_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
30static DATA_EXECUTE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
31static DATA_PROCESS_ANY_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
32static DATA_PROCESS_DATA_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
33#[cfg(feature = "defi")]
34static DATA_PROCESS_DEFI_DATA_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
35static DATA_RESPONSE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
36static EXEC_QUEUE_COMMAND_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
37static EXEC_EXECUTE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
38static EXEC_PROCESS_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
39static EXEC_RECONCILE_REPORT_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
40static RISK_EXECUTE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
41static RISK_PROCESS_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
42static ORDER_EMULATOR_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
43static PORTFOLIO_ACCOUNT_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
44
45macro_rules! define_switchboard {
46 ($(
47 $field:ident: $key_ty:ty,
48 $method:ident($($arg_name:ident: $arg_ty:ty),*) -> $key_expr:expr,
49 $val_fmt:expr,
50 $($val_args:expr),*
51 );* $(;)?) => {
52 #[derive(Clone, Debug)]
54 pub struct MessagingSwitchboard {
55 $(
56 $field: AHashMap<$key_ty, MStr<Topic>>,
57 )*
58 #[cfg(feature = "defi")]
59 pub(crate) defi: crate::defi::switchboard::DefiSwitchboard,
60 }
61
62 impl Default for MessagingSwitchboard {
63 fn default() -> Self {
65 Self {
66 $(
67 $field: AHashMap::new(),
68 )*
69 #[cfg(feature = "defi")]
70 defi: crate::defi::switchboard::DefiSwitchboard::default(),
71 }
72 }
73 }
74
75 impl MessagingSwitchboard {
76 #[inline]
78 #[must_use]
79 pub fn data_engine_queue_execute() -> MStr<Endpoint> {
80 *DATA_QUEUE_COMMAND_ENDPOINT.get_or_init(|| "DataEngine.queue_execute".into())
81 }
82
83 #[inline]
84 #[must_use]
85 pub fn data_engine_execute() -> MStr<Endpoint> {
86 *DATA_EXECUTE_ENDPOINT.get_or_init(|| "DataEngine.execute".into())
87 }
88
89 #[inline]
90 #[must_use]
91 pub fn data_engine_process() -> MStr<Endpoint> {
92 *DATA_PROCESS_ANY_ENDPOINT.get_or_init(|| "DataEngine.process".into())
93 }
94
95 #[inline]
96 #[must_use]
97 pub fn data_engine_process_data() -> MStr<Endpoint> {
98 *DATA_PROCESS_DATA_ENDPOINT.get_or_init(|| "DataEngine.process_data".into())
99 }
100
101 #[cfg(feature = "defi")]
102 #[inline]
103 #[must_use]
104 pub fn data_engine_process_defi_data() -> MStr<Endpoint> {
105 *DATA_PROCESS_DEFI_DATA_ENDPOINT
106 .get_or_init(|| "DataEngine.process_defi_data".into())
107 }
108
109 #[inline]
110 #[must_use]
111 pub fn data_engine_response() -> MStr<Endpoint> {
112 *DATA_RESPONSE_ENDPOINT.get_or_init(|| "DataEngine.response".into())
113 }
114
115 #[inline]
116 #[must_use]
117 pub fn exec_engine_execute() -> MStr<Endpoint> {
118 *EXEC_EXECUTE_ENDPOINT.get_or_init(|| "ExecEngine.execute".into())
119 }
120
121 #[inline]
122 #[must_use]
123 pub fn exec_engine_queue_execute() -> MStr<Endpoint> {
124 *EXEC_QUEUE_COMMAND_ENDPOINT.get_or_init(|| "ExecEngine.queue_execute".into())
125 }
126
127 #[inline]
128 #[must_use]
129 pub fn exec_engine_process() -> MStr<Endpoint> {
130 *EXEC_PROCESS_ENDPOINT.get_or_init(|| "ExecEngine.process".into())
131 }
132
133 #[inline]
134 #[must_use]
135 pub fn exec_engine_reconcile_execution_report() -> MStr<Endpoint> {
136 *EXEC_RECONCILE_REPORT_ENDPOINT.get_or_init(|| "ExecEngine.reconcile_execution_report".into())
137 }
138
139 #[inline]
140 #[must_use]
141 pub fn risk_engine_execute() -> MStr<Endpoint> {
142 *RISK_EXECUTE_ENDPOINT.get_or_init(|| "RiskEngine.execute".into())
143 }
144
145 #[inline]
146 #[must_use]
147 pub fn risk_engine_process() -> MStr<Endpoint> {
148 *RISK_PROCESS_ENDPOINT.get_or_init(|| "RiskEngine.process".into())
149 }
150
151 #[inline]
152 #[must_use]
153 pub fn order_emulator_execute() -> MStr<Endpoint> {
154 *ORDER_EMULATOR_ENDPOINT.get_or_init(|| "OrderEmulator.execute".into())
155 }
156
157 #[inline]
158 #[must_use]
159 pub fn portfolio_update_account() -> MStr<Endpoint> {
160 *PORTFOLIO_ACCOUNT_ENDPOINT.get_or_init(|| "Portfolio.update_account".into())
161 }
162
163 $(
165 #[must_use]
166 pub fn $method(&mut self, $($arg_name: $arg_ty),*) -> MStr<Topic> {
167 let key = $key_expr;
168 *self.$field
169 .entry(key)
170 .or_insert_with(|| format!($val_fmt, $($val_args),*).into())
171 }
172 )*
173 }
174 };
175}
176
177define_switchboard! {
178 custom_topics: DataType,
179 get_custom_topic(data_type: &DataType) -> data_type.clone(),
180 "data.{}", data_type.topic();
181
182 instruments_topics: Venue,
183 get_instruments_topic(venue: Venue) -> venue,
184 "data.instrument.{}", venue;
185
186 instrument_topics: InstrumentId,
187 get_instrument_topic(instrument_id: InstrumentId) -> instrument_id,
188 "data.instrument.{}.{}", instrument_id.venue, instrument_id.symbol;
189
190 book_deltas_topics: InstrumentId,
191 get_book_deltas_topic(instrument_id: InstrumentId) -> instrument_id,
192 "data.book.deltas.{}.{}", instrument_id.venue, instrument_id.symbol;
193
194 book_depth10_topics: InstrumentId,
195 get_book_depth10_topic(instrument_id: InstrumentId) -> instrument_id,
196 "data.book.depth10.{}.{}", instrument_id.venue, instrument_id.symbol;
197
198 book_snapshots_topics: (InstrumentId, NonZeroUsize),
199 get_book_snapshots_topic(instrument_id: InstrumentId, interval_ms: NonZeroUsize) -> (instrument_id, interval_ms),
200 "data.book.snapshots.{}.{}.{}", instrument_id.venue, instrument_id.symbol, interval_ms;
201
202 quote_topics: InstrumentId,
203 get_quotes_topic(instrument_id: InstrumentId) -> instrument_id,
204 "data.quotes.{}.{}", instrument_id.venue, instrument_id.symbol;
205
206 trade_topics: InstrumentId,
207 get_trades_topic(instrument_id: InstrumentId) -> instrument_id,
208 "data.trades.{}.{}", instrument_id.venue, instrument_id.symbol;
209
210 bar_topics: BarType,
211 get_bars_topic(bar_type: BarType) -> bar_type,
212 "data.bars.{}", bar_type;
213
214 mark_price_topics: InstrumentId,
215 get_mark_price_topic(instrument_id: InstrumentId) -> instrument_id,
216 "data.mark_prices.{}.{}", instrument_id.venue, instrument_id.symbol;
217
218 index_price_topics: InstrumentId,
219 get_index_price_topic(instrument_id: InstrumentId) -> instrument_id,
220 "data.index_prices.{}.{}", instrument_id.venue, instrument_id.symbol;
221
222 funding_rate_topics: InstrumentId,
223 get_funding_rate_topic(instrument_id: InstrumentId) -> instrument_id,
224 "data.funding_rates.{}.{}", instrument_id.venue, instrument_id.symbol;
225
226 instrument_status_topics: InstrumentId,
227 get_instrument_status_topic(instrument_id: InstrumentId) -> instrument_id,
228 "data.status.{}.{}", instrument_id.venue, instrument_id.symbol;
229
230 instrument_close_topics: InstrumentId,
231 get_instrument_close_topic(instrument_id: InstrumentId) -> instrument_id,
232 "data.close.{}.{}", instrument_id.venue, instrument_id.symbol;
233
234 order_fills_topics: InstrumentId,
235 get_order_fills_topic(instrument_id: InstrumentId) -> instrument_id,
236 "events.fills.{}", instrument_id;
237
238 order_cancels_topics: InstrumentId,
239 get_order_cancels_topic(instrument_id: InstrumentId) -> instrument_id,
240 "events.cancels.{}", instrument_id;
241
242 order_snapshots_topics: ClientOrderId,
243 get_order_snapshots_topic(client_order_id: ClientOrderId) -> client_order_id,
244 "order.snapshots.{}", client_order_id;
245
246 positions_snapshots_topics: PositionId,
247 get_positions_snapshots_topic(position_id: PositionId) -> position_id,
248 "positions.snapshots.{}", position_id;
249
250 event_orders_topics: StrategyId,
251 get_event_orders_topic(strategy_id: StrategyId) -> strategy_id,
252 "events.order.{}", strategy_id;
253
254 event_positions_topics: StrategyId,
255 get_event_positions_topic(strategy_id: StrategyId) -> strategy_id,
256 "events.position.{}", strategy_id;
257}
258
259macro_rules! define_wrappers {
266 ($($method:ident($($arg_name:ident: $arg_ty:ty),*) -> $ret:ty),* $(,)?) => {
267 $(
268 #[must_use]
269 pub fn $method($($arg_name: $arg_ty),*) -> $ret {
270 get_message_bus()
271 .borrow_mut()
272 .switchboard
273 .$method($($arg_name),*)
274 }
275 )*
276 }
277}
278
279define_wrappers! {
280 get_custom_topic(data_type: &DataType) -> MStr<Topic>,
281 get_instruments_topic(venue: Venue) -> MStr<Topic>,
282 get_instrument_topic(instrument_id: InstrumentId) -> MStr<Topic>,
283 get_book_deltas_topic(instrument_id: InstrumentId) -> MStr<Topic>,
284 get_book_depth10_topic(instrument_id: InstrumentId) -> MStr<Topic>,
285 get_book_snapshots_topic(instrument_id: InstrumentId, interval_ms: NonZeroUsize) -> MStr<Topic>,
286 get_quotes_topic(instrument_id: InstrumentId) -> MStr<Topic>,
287 get_trades_topic(instrument_id: InstrumentId) -> MStr<Topic>,
288 get_bars_topic(bar_type: BarType) -> MStr<Topic>,
289 get_mark_price_topic(instrument_id: InstrumentId) -> MStr<Topic>,
290 get_index_price_topic(instrument_id: InstrumentId) -> MStr<Topic>,
291 get_funding_rate_topic(instrument_id: InstrumentId) -> MStr<Topic>,
292 get_instrument_status_topic(instrument_id: InstrumentId) -> MStr<Topic>,
293 get_instrument_close_topic(instrument_id: InstrumentId) -> MStr<Topic>,
294 get_order_fills_topic(instrument_id: InstrumentId) -> MStr<Topic>,
295 get_order_cancels_topic(instrument_id: InstrumentId) -> MStr<Topic>,
296 get_order_snapshots_topic(client_order_id: ClientOrderId) -> MStr<Topic>,
297 get_positions_snapshots_topic(position_id: PositionId) -> MStr<Topic>,
298 get_event_orders_topic(strategy_id: StrategyId) -> MStr<Topic>,
299 get_event_positions_topic(strategy_id: StrategyId) -> MStr<Topic>,
300}
301
302#[cfg(test)]
303mod tests {
304 use nautilus_model::{
305 data::{BarType, DataType},
306 identifiers::InstrumentId,
307 };
308 use rstest::*;
309
310 use super::*;
311
312 #[fixture]
313 fn switchboard() -> MessagingSwitchboard {
314 MessagingSwitchboard::default()
315 }
316
317 #[fixture]
318 fn instrument_id() -> InstrumentId {
319 InstrumentId::from("ESZ24.XCME")
320 }
321
322 #[rstest]
323 fn test_get_custom_topic(mut switchboard: MessagingSwitchboard) {
324 let data_type = DataType::new("ExampleDataType", None);
325 let expected_topic = "data.ExampleDataType".into();
326 let result = switchboard.get_custom_topic(&data_type);
327 assert_eq!(result, expected_topic);
328 assert!(switchboard.custom_topics.contains_key(&data_type));
329 }
330
331 #[rstest]
332 fn test_get_instrument_topic(
333 mut switchboard: MessagingSwitchboard,
334 instrument_id: InstrumentId,
335 ) {
336 let expected_topic = "data.instrument.XCME.ESZ24".into();
337 let result = switchboard.get_instrument_topic(instrument_id);
338 assert_eq!(result, expected_topic);
339 assert!(switchboard.instrument_topics.contains_key(&instrument_id));
340 }
341
342 #[rstest]
343 fn test_get_book_deltas_topic(
344 mut switchboard: MessagingSwitchboard,
345 instrument_id: InstrumentId,
346 ) {
347 let expected_topic = "data.book.deltas.XCME.ESZ24".into();
348 let result = switchboard.get_book_deltas_topic(instrument_id);
349 assert_eq!(result, expected_topic);
350 assert!(switchboard.book_deltas_topics.contains_key(&instrument_id));
351 }
352
353 #[rstest]
354 fn test_get_book_depth10_topic(
355 mut switchboard: MessagingSwitchboard,
356 instrument_id: InstrumentId,
357 ) {
358 let expected_topic = "data.book.depth10.XCME.ESZ24".into();
359 let result = switchboard.get_book_depth10_topic(instrument_id);
360 assert_eq!(result, expected_topic);
361 assert!(switchboard.book_depth10_topics.contains_key(&instrument_id));
362 }
363
364 #[rstest]
365 fn test_get_book_snapshots_topic(
366 mut switchboard: MessagingSwitchboard,
367 instrument_id: InstrumentId,
368 ) {
369 let expected_topic = "data.book.snapshots.XCME.ESZ24.1000".into();
370 let interval_ms = NonZeroUsize::new(1000).unwrap();
371 let result = switchboard.get_book_snapshots_topic(instrument_id, interval_ms);
372 assert_eq!(result, expected_topic);
373
374 assert!(
375 switchboard
376 .book_snapshots_topics
377 .contains_key(&(instrument_id, interval_ms))
378 );
379 }
380
381 #[rstest]
382 fn test_get_quotes_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
383 let expected_topic = "data.quotes.XCME.ESZ24".into();
384 let result = switchboard.get_quotes_topic(instrument_id);
385 assert_eq!(result, expected_topic);
386 assert!(switchboard.quote_topics.contains_key(&instrument_id));
387 }
388
389 #[rstest]
390 fn test_get_trades_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
391 let expected_topic = "data.trades.XCME.ESZ24".into();
392 let result = switchboard.get_trades_topic(instrument_id);
393 assert_eq!(result, expected_topic);
394 assert!(switchboard.trade_topics.contains_key(&instrument_id));
395 }
396
397 #[rstest]
398 fn test_get_bars_topic(mut switchboard: MessagingSwitchboard) {
399 let bar_type = BarType::from("ESZ24.XCME-1-MINUTE-LAST-INTERNAL");
400 let expected_topic = format!("data.bars.{bar_type}").into();
401 let result = switchboard.get_bars_topic(bar_type);
402 assert_eq!(result, expected_topic);
403 assert!(switchboard.bar_topics.contains_key(&bar_type));
404 }
405
406 #[rstest]
407 fn test_get_order_snapshots_topic(mut switchboard: MessagingSwitchboard) {
408 let client_order_id = ClientOrderId::from("O-123456789");
409 let expected_topic = format!("order.snapshots.{client_order_id}").into();
410 let result = switchboard.get_order_snapshots_topic(client_order_id);
411 assert_eq!(result, expected_topic);
412 assert!(
413 switchboard
414 .order_snapshots_topics
415 .contains_key(&client_order_id)
416 );
417 }
418}