Skip to main content

nautilus_common/msgbus/
switchboard.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{num::NonZeroUsize, sync::OnceLock};
17
18use ahash::AHashMap;
19use nautilus_model::{
20    data::{BarType, DataType},
21    identifiers::{ClientOrderId, InstrumentId, PositionId, StrategyId, Venue},
22};
23
24use super::mstr::{Endpoint, MStr, Topic};
25use crate::msgbus::get_message_bus;
26
27pub const CLOSE_TOPIC: &str = "CLOSE";
28
29static DATA_QUEUE_COMMAND_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
30static DATA_EXECUTE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
31static DATA_PROCESS_ANY_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
32static DATA_PROCESS_DATA_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
33#[cfg(feature = "defi")]
34static DATA_PROCESS_DEFI_DATA_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
35static DATA_RESPONSE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
36static EXEC_QUEUE_COMMAND_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
37static EXEC_EXECUTE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
38static EXEC_PROCESS_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
39static EXEC_RECONCILE_REPORT_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
40static RISK_EXECUTE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
41static RISK_PROCESS_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
42static ORDER_EMULATOR_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
43static PORTFOLIO_ACCOUNT_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
44
45macro_rules! define_switchboard {
46    ($(
47        $field:ident: $key_ty:ty,
48        $method:ident($($arg_name:ident: $arg_ty:ty),*) -> $key_expr:expr,
49        $val_fmt:expr,
50        $($val_args:expr),*
51    );* $(;)?) => {
52        /// Represents a switchboard of built-in messaging endpoint names.
53        #[derive(Clone, Debug)]
54        pub struct MessagingSwitchboard {
55            $(
56                $field: AHashMap<$key_ty, MStr<Topic>>,
57            )*
58            #[cfg(feature = "defi")]
59            pub(crate) defi: crate::defi::switchboard::DefiSwitchboard,
60        }
61
62        impl Default for MessagingSwitchboard {
63            /// Creates a new default [`MessagingSwitchboard`] instance.
64            fn default() -> Self {
65                Self {
66                    $(
67                        $field: AHashMap::new(),
68                    )*
69                    #[cfg(feature = "defi")]
70                    defi: crate::defi::switchboard::DefiSwitchboard::default(),
71                }
72            }
73        }
74
75        impl MessagingSwitchboard {
76            // Static endpoints
77            #[inline]
78            #[must_use]
79            pub fn data_engine_queue_execute() -> MStr<Endpoint> {
80                *DATA_QUEUE_COMMAND_ENDPOINT.get_or_init(|| "DataEngine.queue_execute".into())
81            }
82
83            #[inline]
84            #[must_use]
85            pub fn data_engine_execute() -> MStr<Endpoint> {
86                *DATA_EXECUTE_ENDPOINT.get_or_init(|| "DataEngine.execute".into())
87            }
88
89            #[inline]
90            #[must_use]
91            pub fn data_engine_process() -> MStr<Endpoint> {
92                *DATA_PROCESS_ANY_ENDPOINT.get_or_init(|| "DataEngine.process".into())
93            }
94
95            #[inline]
96            #[must_use]
97            pub fn data_engine_process_data() -> MStr<Endpoint> {
98                *DATA_PROCESS_DATA_ENDPOINT.get_or_init(|| "DataEngine.process_data".into())
99            }
100
101            #[cfg(feature = "defi")]
102            #[inline]
103            #[must_use]
104            pub fn data_engine_process_defi_data() -> MStr<Endpoint> {
105                *DATA_PROCESS_DEFI_DATA_ENDPOINT
106                    .get_or_init(|| "DataEngine.process_defi_data".into())
107            }
108
109            #[inline]
110            #[must_use]
111            pub fn data_engine_response() -> MStr<Endpoint> {
112                *DATA_RESPONSE_ENDPOINT.get_or_init(|| "DataEngine.response".into())
113            }
114
115            #[inline]
116            #[must_use]
117            pub fn exec_engine_execute() -> MStr<Endpoint> {
118                *EXEC_EXECUTE_ENDPOINT.get_or_init(|| "ExecEngine.execute".into())
119            }
120
121            #[inline]
122            #[must_use]
123            pub fn exec_engine_queue_execute() -> MStr<Endpoint> {
124                *EXEC_QUEUE_COMMAND_ENDPOINT.get_or_init(|| "ExecEngine.queue_execute".into())
125            }
126
127            #[inline]
128            #[must_use]
129            pub fn exec_engine_process() -> MStr<Endpoint> {
130                *EXEC_PROCESS_ENDPOINT.get_or_init(|| "ExecEngine.process".into())
131            }
132
133            #[inline]
134            #[must_use]
135            pub fn exec_engine_reconcile_execution_report() -> MStr<Endpoint> {
136                *EXEC_RECONCILE_REPORT_ENDPOINT.get_or_init(|| "ExecEngine.reconcile_execution_report".into())
137            }
138
139            #[inline]
140            #[must_use]
141            pub fn risk_engine_execute() -> MStr<Endpoint> {
142                *RISK_EXECUTE_ENDPOINT.get_or_init(|| "RiskEngine.execute".into())
143            }
144
145            #[inline]
146            #[must_use]
147            pub fn risk_engine_process() -> MStr<Endpoint> {
148                *RISK_PROCESS_ENDPOINT.get_or_init(|| "RiskEngine.process".into())
149            }
150
151            #[inline]
152            #[must_use]
153            pub fn order_emulator_execute() -> MStr<Endpoint> {
154                *ORDER_EMULATOR_ENDPOINT.get_or_init(|| "OrderEmulator.execute".into())
155            }
156
157            #[inline]
158            #[must_use]
159            pub fn portfolio_update_account() -> MStr<Endpoint> {
160                *PORTFOLIO_ACCOUNT_ENDPOINT.get_or_init(|| "Portfolio.update_account".into())
161            }
162
163            // Dynamic topics
164            $(
165                #[must_use]
166                pub fn $method(&mut self, $($arg_name: $arg_ty),*) -> MStr<Topic> {
167                    let key = $key_expr;
168                    *self.$field
169                        .entry(key)
170                        .or_insert_with(|| format!($val_fmt, $($val_args),*).into())
171                }
172            )*
173        }
174    };
175}
176
177define_switchboard! {
178    custom_topics: DataType,
179    get_custom_topic(data_type: &DataType) -> data_type.clone(),
180    "data.{}", data_type.topic();
181
182    instruments_topics: Venue,
183    get_instruments_topic(venue: Venue) -> venue,
184    "data.instrument.{}", venue;
185
186    instrument_topics: InstrumentId,
187    get_instrument_topic(instrument_id: InstrumentId) -> instrument_id,
188    "data.instrument.{}.{}", instrument_id.venue, instrument_id.symbol;
189
190    book_deltas_topics: InstrumentId,
191    get_book_deltas_topic(instrument_id: InstrumentId) -> instrument_id,
192    "data.book.deltas.{}.{}", instrument_id.venue, instrument_id.symbol;
193
194    book_depth10_topics: InstrumentId,
195    get_book_depth10_topic(instrument_id: InstrumentId) -> instrument_id,
196    "data.book.depth10.{}.{}", instrument_id.venue, instrument_id.symbol;
197
198    book_snapshots_topics: (InstrumentId, NonZeroUsize),
199    get_book_snapshots_topic(instrument_id: InstrumentId, interval_ms: NonZeroUsize) -> (instrument_id, interval_ms),
200    "data.book.snapshots.{}.{}.{}", instrument_id.venue, instrument_id.symbol, interval_ms;
201
202    quote_topics: InstrumentId,
203    get_quotes_topic(instrument_id: InstrumentId) -> instrument_id,
204    "data.quotes.{}.{}", instrument_id.venue, instrument_id.symbol;
205
206    trade_topics: InstrumentId,
207    get_trades_topic(instrument_id: InstrumentId) -> instrument_id,
208    "data.trades.{}.{}", instrument_id.venue, instrument_id.symbol;
209
210    bar_topics: BarType,
211    get_bars_topic(bar_type: BarType) -> bar_type,
212    "data.bars.{}", bar_type;
213
214    mark_price_topics: InstrumentId,
215    get_mark_price_topic(instrument_id: InstrumentId) -> instrument_id,
216    "data.mark_prices.{}.{}", instrument_id.venue, instrument_id.symbol;
217
218    index_price_topics: InstrumentId,
219    get_index_price_topic(instrument_id: InstrumentId) -> instrument_id,
220    "data.index_prices.{}.{}", instrument_id.venue, instrument_id.symbol;
221
222    funding_rate_topics: InstrumentId,
223    get_funding_rate_topic(instrument_id: InstrumentId) -> instrument_id,
224    "data.funding_rates.{}.{}", instrument_id.venue, instrument_id.symbol;
225
226    instrument_status_topics: InstrumentId,
227    get_instrument_status_topic(instrument_id: InstrumentId) -> instrument_id,
228    "data.status.{}.{}", instrument_id.venue, instrument_id.symbol;
229
230    instrument_close_topics: InstrumentId,
231    get_instrument_close_topic(instrument_id: InstrumentId) -> instrument_id,
232    "data.close.{}.{}", instrument_id.venue, instrument_id.symbol;
233
234    order_fills_topics: InstrumentId,
235    get_order_fills_topic(instrument_id: InstrumentId) -> instrument_id,
236    "events.fills.{}", instrument_id;
237
238    order_cancels_topics: InstrumentId,
239    get_order_cancels_topic(instrument_id: InstrumentId) -> instrument_id,
240    "events.cancels.{}", instrument_id;
241
242    order_snapshots_topics: ClientOrderId,
243    get_order_snapshots_topic(client_order_id: ClientOrderId) -> client_order_id,
244    "order.snapshots.{}", client_order_id;
245
246    positions_snapshots_topics: PositionId,
247    get_positions_snapshots_topic(position_id: PositionId) -> position_id,
248    "positions.snapshots.{}", position_id;
249
250    event_orders_topics: StrategyId,
251    get_event_orders_topic(strategy_id: StrategyId) -> strategy_id,
252    "events.order.{}", strategy_id;
253
254    event_positions_topics: StrategyId,
255    get_event_positions_topic(strategy_id: StrategyId) -> strategy_id,
256    "events.position.{}", strategy_id;
257}
258
259////////////////////////////////////////////////////////////////////////////////
260// Topic wrapper functions
261////////////////////////////////////////////////////////////////////////////////
262// These wrapper functions provide convenient access to switchboard topic methods
263// by accessing the thread-local message bus instance.
264
265macro_rules! define_wrappers {
266    ($($method:ident($($arg_name:ident: $arg_ty:ty),*) -> $ret:ty),* $(,)?) => {
267        $(
268            #[must_use]
269            pub fn $method($($arg_name: $arg_ty),*) -> $ret {
270                get_message_bus()
271                    .borrow_mut()
272                    .switchboard
273                    .$method($($arg_name),*)
274            }
275        )*
276    }
277}
278
279define_wrappers! {
280    get_custom_topic(data_type: &DataType) -> MStr<Topic>,
281    get_instruments_topic(venue: Venue) -> MStr<Topic>,
282    get_instrument_topic(instrument_id: InstrumentId) -> MStr<Topic>,
283    get_book_deltas_topic(instrument_id: InstrumentId) -> MStr<Topic>,
284    get_book_depth10_topic(instrument_id: InstrumentId) -> MStr<Topic>,
285    get_book_snapshots_topic(instrument_id: InstrumentId, interval_ms: NonZeroUsize) -> MStr<Topic>,
286    get_quotes_topic(instrument_id: InstrumentId) -> MStr<Topic>,
287    get_trades_topic(instrument_id: InstrumentId) -> MStr<Topic>,
288    get_bars_topic(bar_type: BarType) -> MStr<Topic>,
289    get_mark_price_topic(instrument_id: InstrumentId) -> MStr<Topic>,
290    get_index_price_topic(instrument_id: InstrumentId) -> MStr<Topic>,
291    get_funding_rate_topic(instrument_id: InstrumentId) -> MStr<Topic>,
292    get_instrument_status_topic(instrument_id: InstrumentId) -> MStr<Topic>,
293    get_instrument_close_topic(instrument_id: InstrumentId) -> MStr<Topic>,
294    get_order_fills_topic(instrument_id: InstrumentId) -> MStr<Topic>,
295    get_order_cancels_topic(instrument_id: InstrumentId) -> MStr<Topic>,
296    get_order_snapshots_topic(client_order_id: ClientOrderId) -> MStr<Topic>,
297    get_positions_snapshots_topic(position_id: PositionId) -> MStr<Topic>,
298    get_event_orders_topic(strategy_id: StrategyId) -> MStr<Topic>,
299    get_event_positions_topic(strategy_id: StrategyId) -> MStr<Topic>,
300}
301
302#[cfg(test)]
303mod tests {
304    use nautilus_model::{
305        data::{BarType, DataType},
306        identifiers::InstrumentId,
307    };
308    use rstest::*;
309
310    use super::*;
311
312    #[fixture]
313    fn switchboard() -> MessagingSwitchboard {
314        MessagingSwitchboard::default()
315    }
316
317    #[fixture]
318    fn instrument_id() -> InstrumentId {
319        InstrumentId::from("ESZ24.XCME")
320    }
321
322    #[rstest]
323    fn test_get_custom_topic(mut switchboard: MessagingSwitchboard) {
324        let data_type = DataType::new("ExampleDataType", None);
325        let expected_topic = "data.ExampleDataType".into();
326        let result = switchboard.get_custom_topic(&data_type);
327        assert_eq!(result, expected_topic);
328        assert!(switchboard.custom_topics.contains_key(&data_type));
329    }
330
331    #[rstest]
332    fn test_get_instrument_topic(
333        mut switchboard: MessagingSwitchboard,
334        instrument_id: InstrumentId,
335    ) {
336        let expected_topic = "data.instrument.XCME.ESZ24".into();
337        let result = switchboard.get_instrument_topic(instrument_id);
338        assert_eq!(result, expected_topic);
339        assert!(switchboard.instrument_topics.contains_key(&instrument_id));
340    }
341
342    #[rstest]
343    fn test_get_book_deltas_topic(
344        mut switchboard: MessagingSwitchboard,
345        instrument_id: InstrumentId,
346    ) {
347        let expected_topic = "data.book.deltas.XCME.ESZ24".into();
348        let result = switchboard.get_book_deltas_topic(instrument_id);
349        assert_eq!(result, expected_topic);
350        assert!(switchboard.book_deltas_topics.contains_key(&instrument_id));
351    }
352
353    #[rstest]
354    fn test_get_book_depth10_topic(
355        mut switchboard: MessagingSwitchboard,
356        instrument_id: InstrumentId,
357    ) {
358        let expected_topic = "data.book.depth10.XCME.ESZ24".into();
359        let result = switchboard.get_book_depth10_topic(instrument_id);
360        assert_eq!(result, expected_topic);
361        assert!(switchboard.book_depth10_topics.contains_key(&instrument_id));
362    }
363
364    #[rstest]
365    fn test_get_book_snapshots_topic(
366        mut switchboard: MessagingSwitchboard,
367        instrument_id: InstrumentId,
368    ) {
369        let expected_topic = "data.book.snapshots.XCME.ESZ24.1000".into();
370        let interval_ms = NonZeroUsize::new(1000).unwrap();
371        let result = switchboard.get_book_snapshots_topic(instrument_id, interval_ms);
372        assert_eq!(result, expected_topic);
373
374        assert!(
375            switchboard
376                .book_snapshots_topics
377                .contains_key(&(instrument_id, interval_ms))
378        );
379    }
380
381    #[rstest]
382    fn test_get_quotes_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
383        let expected_topic = "data.quotes.XCME.ESZ24".into();
384        let result = switchboard.get_quotes_topic(instrument_id);
385        assert_eq!(result, expected_topic);
386        assert!(switchboard.quote_topics.contains_key(&instrument_id));
387    }
388
389    #[rstest]
390    fn test_get_trades_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
391        let expected_topic = "data.trades.XCME.ESZ24".into();
392        let result = switchboard.get_trades_topic(instrument_id);
393        assert_eq!(result, expected_topic);
394        assert!(switchboard.trade_topics.contains_key(&instrument_id));
395    }
396
397    #[rstest]
398    fn test_get_bars_topic(mut switchboard: MessagingSwitchboard) {
399        let bar_type = BarType::from("ESZ24.XCME-1-MINUTE-LAST-INTERNAL");
400        let expected_topic = format!("data.bars.{bar_type}").into();
401        let result = switchboard.get_bars_topic(bar_type);
402        assert_eq!(result, expected_topic);
403        assert!(switchboard.bar_topics.contains_key(&bar_type));
404    }
405
406    #[rstest]
407    fn test_get_order_snapshots_topic(mut switchboard: MessagingSwitchboard) {
408        let client_order_id = ClientOrderId::from("O-123456789");
409        let expected_topic = format!("order.snapshots.{client_order_id}").into();
410        let result = switchboard.get_order_snapshots_topic(client_order_id);
411        assert_eq!(result, expected_topic);
412        assert!(
413            switchboard
414                .order_snapshots_topics
415                .contains_key(&client_order_id)
416        );
417    }
418}