1use std::{num::NonZeroUsize, sync::OnceLock};
17
18use ahash::AHashMap;
19use nautilus_model::{
20 data::{BarType, DataType},
21 identifiers::{ClientOrderId, InstrumentId, PositionId, StrategyId, Venue},
22};
23
24use super::core::{Endpoint, MStr, Topic};
25use crate::msgbus::get_message_bus;
26
27pub const CLOSE_TOPIC: &str = "CLOSE";
28
29static DATA_QUEUE_EXECUTE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
36static DATA_EXECUTE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
37static DATA_PROCESS_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
38static DATA_RESPONSE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
39static EXEC_EXECUTE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
40static EXEC_PROCESS_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
41static EXEC_RECONCILE_REPORT_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
42static RISK_EXECUTE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
43static RISK_PROCESS_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
44static PORTFOLIO_UPDATE_ACCOUNT_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
45
46macro_rules! define_switchboard {
47 ($(
48 $field:ident: $key_ty:ty,
49 $method:ident($($arg_name:ident: $arg_ty:ty),*) -> $key_expr:expr,
50 $val_fmt:expr,
51 $($val_args:expr),*
52 );* $(;)?) => {
53 #[derive(Clone, Debug)]
55 pub struct MessagingSwitchboard {
56 $(
57 $field: AHashMap<$key_ty, MStr<Topic>>,
58 )*
59 #[cfg(feature = "defi")]
60 pub(crate) defi: crate::defi::switchboard::DefiSwitchboard,
61 }
62
63 impl Default for MessagingSwitchboard {
64 fn default() -> Self {
66 Self {
67 $(
68 $field: AHashMap::new(),
69 )*
70 #[cfg(feature = "defi")]
71 defi: crate::defi::switchboard::DefiSwitchboard::default(),
72 }
73 }
74 }
75
76 impl MessagingSwitchboard {
77 #[inline]
79 #[must_use]
80 pub fn data_engine_queue_execute() -> MStr<Endpoint> {
81 *DATA_QUEUE_EXECUTE_ENDPOINT.get_or_init(|| "DataEngine.queue_execute".into())
82 }
83
84 #[inline]
85 #[must_use]
86 pub fn data_engine_execute() -> MStr<Endpoint> {
87 *DATA_EXECUTE_ENDPOINT.get_or_init(|| "DataEngine.execute".into())
88 }
89
90 #[inline]
91 #[must_use]
92 pub fn data_engine_process() -> MStr<Endpoint> {
93 *DATA_PROCESS_ENDPOINT.get_or_init(|| "DataEngine.process".into())
94 }
95
96 #[inline]
97 #[must_use]
98 pub fn data_engine_response() -> MStr<Endpoint> {
99 *DATA_RESPONSE_ENDPOINT.get_or_init(|| "DataEngine.response".into())
100 }
101
102 #[inline]
103 #[must_use]
104 pub fn exec_engine_execute() -> MStr<Endpoint> {
105 *EXEC_EXECUTE_ENDPOINT.get_or_init(|| "ExecEngine.execute".into())
106 }
107
108 #[inline]
109 #[must_use]
110 pub fn exec_engine_process() -> MStr<Endpoint> {
111 *EXEC_PROCESS_ENDPOINT.get_or_init(|| "ExecEngine.process".into())
112 }
113
114 #[inline]
115 #[must_use]
116 pub fn exec_engine_reconcile_execution_report() -> MStr<Endpoint> {
117 *EXEC_RECONCILE_REPORT_ENDPOINT.get_or_init(|| "ExecEngine.reconcile_execution_report".into())
118 }
119
120 #[inline]
121 #[must_use]
122 pub fn risk_engine_execute() -> MStr<Endpoint> {
123 *RISK_EXECUTE_ENDPOINT.get_or_init(|| "RiskEngine.execute".into())
124 }
125
126 #[inline]
127 #[must_use]
128 pub fn risk_engine_process() -> MStr<Endpoint> {
129 *RISK_PROCESS_ENDPOINT.get_or_init(|| "RiskEngine.process".into())
130 }
131
132 #[inline]
133 #[must_use]
134 pub fn portfolio_update_account() -> MStr<Endpoint> {
135 *PORTFOLIO_UPDATE_ACCOUNT_ENDPOINT.get_or_init(|| "Portfolio.update_account".into())
136 }
137
138 $(
140 #[must_use]
141 pub fn $method(&mut self, $($arg_name: $arg_ty),*) -> MStr<Topic> {
142 let key = $key_expr;
143 *self.$field
144 .entry(key)
145 .or_insert_with(|| format!($val_fmt, $($val_args),*).into())
146 }
147 )*
148 }
149 };
150}
151
152define_switchboard! {
153 custom_topics: DataType,
154 get_custom_topic(data_type: &DataType) -> data_type.clone(),
155 "data.{}", data_type.topic();
156
157 instruments_topics: Venue,
158 get_instruments_topic(venue: Venue) -> venue,
159 "data.instrument.{}", venue;
160
161 instrument_topics: InstrumentId,
162 get_instrument_topic(instrument_id: InstrumentId) -> instrument_id,
163 "data.instrument.{}.{}", instrument_id.venue, instrument_id.symbol;
164
165 book_deltas_topics: InstrumentId,
166 get_book_deltas_topic(instrument_id: InstrumentId) -> instrument_id,
167 "data.book.deltas.{}.{}", instrument_id.venue, instrument_id.symbol;
168
169 book_depth10_topics: InstrumentId,
170 get_book_depth10_topic(instrument_id: InstrumentId) -> instrument_id,
171 "data.book.depth10.{}.{}", instrument_id.venue, instrument_id.symbol;
172
173 book_snapshots_topics: (InstrumentId, NonZeroUsize),
174 get_book_snapshots_topic(instrument_id: InstrumentId, interval_ms: NonZeroUsize) -> (instrument_id, interval_ms),
175 "data.book.snapshots.{}.{}.{}", instrument_id.venue, instrument_id.symbol, interval_ms;
176
177 quote_topics: InstrumentId,
178 get_quotes_topic(instrument_id: InstrumentId) -> instrument_id,
179 "data.quotes.{}.{}", instrument_id.venue, instrument_id.symbol;
180
181 trade_topics: InstrumentId,
182 get_trades_topic(instrument_id: InstrumentId) -> instrument_id,
183 "data.trades.{}.{}", instrument_id.venue, instrument_id.symbol;
184
185 bar_topics: BarType,
186 get_bars_topic(bar_type: BarType) -> bar_type,
187 "data.bars.{}", bar_type;
188
189 mark_price_topics: InstrumentId,
190 get_mark_price_topic(instrument_id: InstrumentId) -> instrument_id,
191 "data.mark_prices.{}.{}", instrument_id.venue, instrument_id.symbol;
192
193 index_price_topics: InstrumentId,
194 get_index_price_topic(instrument_id: InstrumentId) -> instrument_id,
195 "data.index_prices.{}.{}", instrument_id.venue, instrument_id.symbol;
196
197 funding_rate_topics: InstrumentId,
198 get_funding_rate_topic(instrument_id: InstrumentId) -> instrument_id,
199 "data.funding_rates.{}.{}", instrument_id.venue, instrument_id.symbol;
200
201 instrument_status_topics: InstrumentId,
202 get_instrument_status_topic(instrument_id: InstrumentId) -> instrument_id,
203 "data.status.{}.{}", instrument_id.venue, instrument_id.symbol;
204
205 instrument_close_topics: InstrumentId,
206 get_instrument_close_topic(instrument_id: InstrumentId) -> instrument_id,
207 "data.close.{}.{}", instrument_id.venue, instrument_id.symbol;
208
209 order_fills_topics: InstrumentId,
210 get_order_fills_topic(instrument_id: InstrumentId) -> instrument_id,
211 "events.fills.{}", instrument_id;
212
213 order_cancels_topics: InstrumentId,
214 get_order_cancels_topic(instrument_id: InstrumentId) -> instrument_id,
215 "events.cancels.{}", instrument_id;
216
217 order_snapshots_topics: ClientOrderId,
218 get_order_snapshots_topic(client_order_id: ClientOrderId) -> client_order_id,
219 "order.snapshots.{}", client_order_id;
220
221 positions_snapshots_topics: PositionId,
222 get_positions_snapshots_topic(position_id: PositionId) -> position_id,
223 "positions.snapshots.{}", position_id;
224
225 event_orders_topics: StrategyId,
226 get_event_orders_topic(strategy_id: StrategyId) -> strategy_id,
227 "events.order.{}", strategy_id;
228
229 event_positions_topics: StrategyId,
230 get_event_positions_topic(strategy_id: StrategyId) -> strategy_id,
231 "events.position.{}", strategy_id;
232}
233
234macro_rules! define_wrappers {
241 ($($method:ident($($arg_name:ident: $arg_ty:ty),*) -> $ret:ty),* $(,)?) => {
242 $(
243 #[must_use]
244 pub fn $method($($arg_name: $arg_ty),*) -> $ret {
245 get_message_bus()
246 .borrow_mut()
247 .switchboard
248 .$method($($arg_name),*)
249 }
250 )*
251 }
252}
253
254define_wrappers! {
255 get_custom_topic(data_type: &DataType) -> MStr<Topic>,
256 get_instruments_topic(venue: Venue) -> MStr<Topic>,
257 get_instrument_topic(instrument_id: InstrumentId) -> MStr<Topic>,
258 get_book_deltas_topic(instrument_id: InstrumentId) -> MStr<Topic>,
259 get_book_depth10_topic(instrument_id: InstrumentId) -> MStr<Topic>,
260 get_book_snapshots_topic(instrument_id: InstrumentId, interval_ms: NonZeroUsize) -> MStr<Topic>,
261 get_quotes_topic(instrument_id: InstrumentId) -> MStr<Topic>,
262 get_trades_topic(instrument_id: InstrumentId) -> MStr<Topic>,
263 get_bars_topic(bar_type: BarType) -> MStr<Topic>,
264 get_mark_price_topic(instrument_id: InstrumentId) -> MStr<Topic>,
265 get_index_price_topic(instrument_id: InstrumentId) -> MStr<Topic>,
266 get_funding_rate_topic(instrument_id: InstrumentId) -> MStr<Topic>,
267 get_instrument_status_topic(instrument_id: InstrumentId) -> MStr<Topic>,
268 get_instrument_close_topic(instrument_id: InstrumentId) -> MStr<Topic>,
269 get_order_fills_topic(instrument_id: InstrumentId) -> MStr<Topic>,
270 get_order_cancels_topic(instrument_id: InstrumentId) -> MStr<Topic>,
271 get_order_snapshots_topic(client_order_id: ClientOrderId) -> MStr<Topic>,
272 get_positions_snapshots_topic(position_id: PositionId) -> MStr<Topic>,
273 get_event_orders_topic(strategy_id: StrategyId) -> MStr<Topic>,
274 get_event_positions_topic(strategy_id: StrategyId) -> MStr<Topic>,
275}
276
277#[cfg(test)]
278mod tests {
279 use nautilus_model::{
280 data::{BarType, DataType},
281 identifiers::InstrumentId,
282 };
283 use rstest::*;
284
285 use super::*;
286
287 #[fixture]
288 fn switchboard() -> MessagingSwitchboard {
289 MessagingSwitchboard::default()
290 }
291
292 #[fixture]
293 fn instrument_id() -> InstrumentId {
294 InstrumentId::from("ESZ24.XCME")
295 }
296
297 #[rstest]
298 fn test_get_custom_topic(mut switchboard: MessagingSwitchboard) {
299 let data_type = DataType::new("ExampleDataType", None);
300 let expected_topic = "data.ExampleDataType".into();
301 let result = switchboard.get_custom_topic(&data_type);
302 assert_eq!(result, expected_topic);
303 assert!(switchboard.custom_topics.contains_key(&data_type));
304 }
305
306 #[rstest]
307 fn test_get_instrument_topic(
308 mut switchboard: MessagingSwitchboard,
309 instrument_id: InstrumentId,
310 ) {
311 let expected_topic = "data.instrument.XCME.ESZ24".into();
312 let result = switchboard.get_instrument_topic(instrument_id);
313 assert_eq!(result, expected_topic);
314 assert!(switchboard.instrument_topics.contains_key(&instrument_id));
315 }
316
317 #[rstest]
318 fn test_get_book_deltas_topic(
319 mut switchboard: MessagingSwitchboard,
320 instrument_id: InstrumentId,
321 ) {
322 let expected_topic = "data.book.deltas.XCME.ESZ24".into();
323 let result = switchboard.get_book_deltas_topic(instrument_id);
324 assert_eq!(result, expected_topic);
325 assert!(switchboard.book_deltas_topics.contains_key(&instrument_id));
326 }
327
328 #[rstest]
329 fn test_get_book_depth10_topic(
330 mut switchboard: MessagingSwitchboard,
331 instrument_id: InstrumentId,
332 ) {
333 let expected_topic = "data.book.depth10.XCME.ESZ24".into();
334 let result = switchboard.get_book_depth10_topic(instrument_id);
335 assert_eq!(result, expected_topic);
336 assert!(switchboard.book_depth10_topics.contains_key(&instrument_id));
337 }
338
339 #[rstest]
340 fn test_get_book_snapshots_topic(
341 mut switchboard: MessagingSwitchboard,
342 instrument_id: InstrumentId,
343 ) {
344 let expected_topic = "data.book.snapshots.XCME.ESZ24.1000".into();
345 let interval_ms = NonZeroUsize::new(1000).unwrap();
346 let result = switchboard.get_book_snapshots_topic(instrument_id, interval_ms);
347 assert_eq!(result, expected_topic);
348
349 assert!(
350 switchboard
351 .book_snapshots_topics
352 .contains_key(&(instrument_id, interval_ms))
353 );
354 }
355
356 #[rstest]
357 fn test_get_quotes_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
358 let expected_topic = "data.quotes.XCME.ESZ24".into();
359 let result = switchboard.get_quotes_topic(instrument_id);
360 assert_eq!(result, expected_topic);
361 assert!(switchboard.quote_topics.contains_key(&instrument_id));
362 }
363
364 #[rstest]
365 fn test_get_trades_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
366 let expected_topic = "data.trades.XCME.ESZ24".into();
367 let result = switchboard.get_trades_topic(instrument_id);
368 assert_eq!(result, expected_topic);
369 assert!(switchboard.trade_topics.contains_key(&instrument_id));
370 }
371
372 #[rstest]
373 fn test_get_bars_topic(mut switchboard: MessagingSwitchboard) {
374 let bar_type = BarType::from("ESZ24.XCME-1-MINUTE-LAST-INTERNAL");
375 let expected_topic = format!("data.bars.{bar_type}").into();
376 let result = switchboard.get_bars_topic(bar_type);
377 assert_eq!(result, expected_topic);
378 assert!(switchboard.bar_topics.contains_key(&bar_type));
379 }
380
381 #[rstest]
382 fn test_get_order_snapshots_topic(mut switchboard: MessagingSwitchboard) {
383 let client_order_id = ClientOrderId::from("O-123456789");
384 let expected_topic = format!("order.snapshots.{client_order_id}").into();
385 let result = switchboard.get_order_snapshots_topic(client_order_id);
386 assert_eq!(result, expected_topic);
387 assert!(
388 switchboard
389 .order_snapshots_topics
390 .contains_key(&client_order_id)
391 );
392 }
393}