nautilus_common/msgbus/
switchboard.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{num::NonZeroUsize, sync::OnceLock};
17
18use ahash::AHashMap;
19use nautilus_model::{
20    data::{BarType, DataType},
21    identifiers::{ClientOrderId, InstrumentId, PositionId, StrategyId, Venue},
22};
23
24use super::core::{Endpoint, MStr, Topic};
25use crate::msgbus::get_message_bus;
26
27pub const CLOSE_TOPIC: &str = "CLOSE";
28
29////////////////////////////////////////////////////////////////////////////////
30// Built-in endpoint constants
31////////////////////////////////////////////////////////////////////////////////
32// These are static endpoint addresses.
33// They use OnceLock for thread-safe lazy initialization without instance state.
34
35static DATA_QUEUE_EXECUTE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
36static DATA_EXECUTE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
37static DATA_PROCESS_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
38static DATA_RESPONSE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
39static EXEC_EXECUTE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
40static EXEC_PROCESS_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
41static EXEC_RECONCILE_REPORT_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
42static RISK_EXECUTE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
43static RISK_PROCESS_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
44static PORTFOLIO_UPDATE_ACCOUNT_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
45
46macro_rules! define_switchboard {
47    ($(
48        $field:ident: $key_ty:ty,
49        $method:ident($($arg_name:ident: $arg_ty:ty),*) -> $key_expr:expr,
50        $val_fmt:expr,
51        $($val_args:expr),*
52    );* $(;)?) => {
53        /// Represents a switchboard of built-in messaging endpoint names.
54        #[derive(Clone, Debug)]
55        pub struct MessagingSwitchboard {
56            $(
57                $field: AHashMap<$key_ty, MStr<Topic>>,
58            )*
59            #[cfg(feature = "defi")]
60            pub(crate) defi: crate::defi::switchboard::DefiSwitchboard,
61        }
62
63        impl Default for MessagingSwitchboard {
64            /// Creates a new default [`MessagingSwitchboard`] instance.
65            fn default() -> Self {
66                Self {
67                    $(
68                        $field: AHashMap::new(),
69                    )*
70                    #[cfg(feature = "defi")]
71                    defi: crate::defi::switchboard::DefiSwitchboard::default(),
72                }
73            }
74        }
75
76        impl MessagingSwitchboard {
77            // Static endpoints
78            #[inline]
79            #[must_use]
80            pub fn data_engine_queue_execute() -> MStr<Endpoint> {
81                *DATA_QUEUE_EXECUTE_ENDPOINT.get_or_init(|| "DataEngine.queue_execute".into())
82            }
83
84            #[inline]
85            #[must_use]
86            pub fn data_engine_execute() -> MStr<Endpoint> {
87                *DATA_EXECUTE_ENDPOINT.get_or_init(|| "DataEngine.execute".into())
88            }
89
90            #[inline]
91            #[must_use]
92            pub fn data_engine_process() -> MStr<Endpoint> {
93                *DATA_PROCESS_ENDPOINT.get_or_init(|| "DataEngine.process".into())
94            }
95
96            #[inline]
97            #[must_use]
98            pub fn data_engine_response() -> MStr<Endpoint> {
99                *DATA_RESPONSE_ENDPOINT.get_or_init(|| "DataEngine.response".into())
100            }
101
102            #[inline]
103            #[must_use]
104            pub fn exec_engine_execute() -> MStr<Endpoint> {
105                *EXEC_EXECUTE_ENDPOINT.get_or_init(|| "ExecEngine.execute".into())
106            }
107
108            #[inline]
109            #[must_use]
110            pub fn exec_engine_process() -> MStr<Endpoint> {
111                *EXEC_PROCESS_ENDPOINT.get_or_init(|| "ExecEngine.process".into())
112            }
113
114            #[inline]
115            #[must_use]
116            pub fn exec_engine_reconcile_execution_report() -> MStr<Endpoint> {
117                *EXEC_RECONCILE_REPORT_ENDPOINT.get_or_init(|| "ExecEngine.reconcile_execution_report".into())
118            }
119
120            #[inline]
121            #[must_use]
122            pub fn risk_engine_execute() -> MStr<Endpoint> {
123                *RISK_EXECUTE_ENDPOINT.get_or_init(|| "RiskEngine.execute".into())
124            }
125
126            #[inline]
127            #[must_use]
128            pub fn risk_engine_process() -> MStr<Endpoint> {
129                *RISK_PROCESS_ENDPOINT.get_or_init(|| "RiskEngine.process".into())
130            }
131
132            #[inline]
133            #[must_use]
134            pub fn portfolio_update_account() -> MStr<Endpoint> {
135                *PORTFOLIO_UPDATE_ACCOUNT_ENDPOINT.get_or_init(|| "Portfolio.update_account".into())
136            }
137
138            // Dynamic topics
139            $(
140                #[must_use]
141                pub fn $method(&mut self, $($arg_name: $arg_ty),*) -> MStr<Topic> {
142                    let key = $key_expr;
143                    *self.$field
144                        .entry(key)
145                        .or_insert_with(|| format!($val_fmt, $($val_args),*).into())
146                }
147            )*
148        }
149    };
150}
151
152define_switchboard! {
153    custom_topics: DataType,
154    get_custom_topic(data_type: &DataType) -> data_type.clone(),
155    "data.{}", data_type.topic();
156
157    instruments_topics: Venue,
158    get_instruments_topic(venue: Venue) -> venue,
159    "data.instrument.{}", venue;
160
161    instrument_topics: InstrumentId,
162    get_instrument_topic(instrument_id: InstrumentId) -> instrument_id,
163    "data.instrument.{}.{}", instrument_id.venue, instrument_id.symbol;
164
165    book_deltas_topics: InstrumentId,
166    get_book_deltas_topic(instrument_id: InstrumentId) -> instrument_id,
167    "data.book.deltas.{}.{}", instrument_id.venue, instrument_id.symbol;
168
169    book_depth10_topics: InstrumentId,
170    get_book_depth10_topic(instrument_id: InstrumentId) -> instrument_id,
171    "data.book.depth10.{}.{}", instrument_id.venue, instrument_id.symbol;
172
173    book_snapshots_topics: (InstrumentId, NonZeroUsize),
174    get_book_snapshots_topic(instrument_id: InstrumentId, interval_ms: NonZeroUsize) -> (instrument_id, interval_ms),
175    "data.book.snapshots.{}.{}.{}", instrument_id.venue, instrument_id.symbol, interval_ms;
176
177    quote_topics: InstrumentId,
178    get_quotes_topic(instrument_id: InstrumentId) -> instrument_id,
179    "data.quotes.{}.{}", instrument_id.venue, instrument_id.symbol;
180
181    trade_topics: InstrumentId,
182    get_trades_topic(instrument_id: InstrumentId) -> instrument_id,
183    "data.trades.{}.{}", instrument_id.venue, instrument_id.symbol;
184
185    bar_topics: BarType,
186    get_bars_topic(bar_type: BarType) -> bar_type,
187    "data.bars.{}", bar_type;
188
189    mark_price_topics: InstrumentId,
190    get_mark_price_topic(instrument_id: InstrumentId) -> instrument_id,
191    "data.mark_prices.{}.{}", instrument_id.venue, instrument_id.symbol;
192
193    index_price_topics: InstrumentId,
194    get_index_price_topic(instrument_id: InstrumentId) -> instrument_id,
195    "data.index_prices.{}.{}", instrument_id.venue, instrument_id.symbol;
196
197    funding_rate_topics: InstrumentId,
198    get_funding_rate_topic(instrument_id: InstrumentId) -> instrument_id,
199    "data.funding_rates.{}.{}", instrument_id.venue, instrument_id.symbol;
200
201    instrument_status_topics: InstrumentId,
202    get_instrument_status_topic(instrument_id: InstrumentId) -> instrument_id,
203    "data.status.{}.{}", instrument_id.venue, instrument_id.symbol;
204
205    instrument_close_topics: InstrumentId,
206    get_instrument_close_topic(instrument_id: InstrumentId) -> instrument_id,
207    "data.close.{}.{}", instrument_id.venue, instrument_id.symbol;
208
209    order_fills_topics: InstrumentId,
210    get_order_fills_topic(instrument_id: InstrumentId) -> instrument_id,
211    "events.fills.{}", instrument_id;
212
213    order_cancels_topics: InstrumentId,
214    get_order_cancels_topic(instrument_id: InstrumentId) -> instrument_id,
215    "events.cancels.{}", instrument_id;
216
217    order_snapshots_topics: ClientOrderId,
218    get_order_snapshots_topic(client_order_id: ClientOrderId) -> client_order_id,
219    "order.snapshots.{}", client_order_id;
220
221    positions_snapshots_topics: PositionId,
222    get_positions_snapshots_topic(position_id: PositionId) -> position_id,
223    "positions.snapshots.{}", position_id;
224
225    event_orders_topics: StrategyId,
226    get_event_orders_topic(strategy_id: StrategyId) -> strategy_id,
227    "events.order.{}", strategy_id;
228
229    event_positions_topics: StrategyId,
230    get_event_positions_topic(strategy_id: StrategyId) -> strategy_id,
231    "events.position.{}", strategy_id;
232}
233
234////////////////////////////////////////////////////////////////////////////////
235// Topic wrapper functions
236////////////////////////////////////////////////////////////////////////////////
237// These wrapper functions provide convenient access to switchboard topic methods
238// by accessing the thread-local message bus instance.
239
240macro_rules! define_wrappers {
241    ($($method:ident($($arg_name:ident: $arg_ty:ty),*) -> $ret:ty),* $(,)?) => {
242        $(
243            #[must_use]
244            pub fn $method($($arg_name: $arg_ty),*) -> $ret {
245                get_message_bus()
246                    .borrow_mut()
247                    .switchboard
248                    .$method($($arg_name),*)
249            }
250        )*
251    }
252}
253
254define_wrappers! {
255    get_custom_topic(data_type: &DataType) -> MStr<Topic>,
256    get_instruments_topic(venue: Venue) -> MStr<Topic>,
257    get_instrument_topic(instrument_id: InstrumentId) -> MStr<Topic>,
258    get_book_deltas_topic(instrument_id: InstrumentId) -> MStr<Topic>,
259    get_book_depth10_topic(instrument_id: InstrumentId) -> MStr<Topic>,
260    get_book_snapshots_topic(instrument_id: InstrumentId, interval_ms: NonZeroUsize) -> MStr<Topic>,
261    get_quotes_topic(instrument_id: InstrumentId) -> MStr<Topic>,
262    get_trades_topic(instrument_id: InstrumentId) -> MStr<Topic>,
263    get_bars_topic(bar_type: BarType) -> MStr<Topic>,
264    get_mark_price_topic(instrument_id: InstrumentId) -> MStr<Topic>,
265    get_index_price_topic(instrument_id: InstrumentId) -> MStr<Topic>,
266    get_funding_rate_topic(instrument_id: InstrumentId) -> MStr<Topic>,
267    get_instrument_status_topic(instrument_id: InstrumentId) -> MStr<Topic>,
268    get_instrument_close_topic(instrument_id: InstrumentId) -> MStr<Topic>,
269    get_order_fills_topic(instrument_id: InstrumentId) -> MStr<Topic>,
270    get_order_cancels_topic(instrument_id: InstrumentId) -> MStr<Topic>,
271    get_order_snapshots_topic(client_order_id: ClientOrderId) -> MStr<Topic>,
272    get_positions_snapshots_topic(position_id: PositionId) -> MStr<Topic>,
273    get_event_orders_topic(strategy_id: StrategyId) -> MStr<Topic>,
274    get_event_positions_topic(strategy_id: StrategyId) -> MStr<Topic>,
275}
276
277#[cfg(test)]
278mod tests {
279    use nautilus_model::{
280        data::{BarType, DataType},
281        identifiers::InstrumentId,
282    };
283    use rstest::*;
284
285    use super::*;
286
287    #[fixture]
288    fn switchboard() -> MessagingSwitchboard {
289        MessagingSwitchboard::default()
290    }
291
292    #[fixture]
293    fn instrument_id() -> InstrumentId {
294        InstrumentId::from("ESZ24.XCME")
295    }
296
297    #[rstest]
298    fn test_get_custom_topic(mut switchboard: MessagingSwitchboard) {
299        let data_type = DataType::new("ExampleDataType", None);
300        let expected_topic = "data.ExampleDataType".into();
301        let result = switchboard.get_custom_topic(&data_type);
302        assert_eq!(result, expected_topic);
303        assert!(switchboard.custom_topics.contains_key(&data_type));
304    }
305
306    #[rstest]
307    fn test_get_instrument_topic(
308        mut switchboard: MessagingSwitchboard,
309        instrument_id: InstrumentId,
310    ) {
311        let expected_topic = "data.instrument.XCME.ESZ24".into();
312        let result = switchboard.get_instrument_topic(instrument_id);
313        assert_eq!(result, expected_topic);
314        assert!(switchboard.instrument_topics.contains_key(&instrument_id));
315    }
316
317    #[rstest]
318    fn test_get_book_deltas_topic(
319        mut switchboard: MessagingSwitchboard,
320        instrument_id: InstrumentId,
321    ) {
322        let expected_topic = "data.book.deltas.XCME.ESZ24".into();
323        let result = switchboard.get_book_deltas_topic(instrument_id);
324        assert_eq!(result, expected_topic);
325        assert!(switchboard.book_deltas_topics.contains_key(&instrument_id));
326    }
327
328    #[rstest]
329    fn test_get_book_depth10_topic(
330        mut switchboard: MessagingSwitchboard,
331        instrument_id: InstrumentId,
332    ) {
333        let expected_topic = "data.book.depth10.XCME.ESZ24".into();
334        let result = switchboard.get_book_depth10_topic(instrument_id);
335        assert_eq!(result, expected_topic);
336        assert!(switchboard.book_depth10_topics.contains_key(&instrument_id));
337    }
338
339    #[rstest]
340    fn test_get_book_snapshots_topic(
341        mut switchboard: MessagingSwitchboard,
342        instrument_id: InstrumentId,
343    ) {
344        let expected_topic = "data.book.snapshots.XCME.ESZ24.1000".into();
345        let interval_ms = NonZeroUsize::new(1000).unwrap();
346        let result = switchboard.get_book_snapshots_topic(instrument_id, interval_ms);
347        assert_eq!(result, expected_topic);
348
349        assert!(
350            switchboard
351                .book_snapshots_topics
352                .contains_key(&(instrument_id, interval_ms))
353        );
354    }
355
356    #[rstest]
357    fn test_get_quotes_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
358        let expected_topic = "data.quotes.XCME.ESZ24".into();
359        let result = switchboard.get_quotes_topic(instrument_id);
360        assert_eq!(result, expected_topic);
361        assert!(switchboard.quote_topics.contains_key(&instrument_id));
362    }
363
364    #[rstest]
365    fn test_get_trades_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
366        let expected_topic = "data.trades.XCME.ESZ24".into();
367        let result = switchboard.get_trades_topic(instrument_id);
368        assert_eq!(result, expected_topic);
369        assert!(switchboard.trade_topics.contains_key(&instrument_id));
370    }
371
372    #[rstest]
373    fn test_get_bars_topic(mut switchboard: MessagingSwitchboard) {
374        let bar_type = BarType::from("ESZ24.XCME-1-MINUTE-LAST-INTERNAL");
375        let expected_topic = format!("data.bars.{bar_type}").into();
376        let result = switchboard.get_bars_topic(bar_type);
377        assert_eq!(result, expected_topic);
378        assert!(switchboard.bar_topics.contains_key(&bar_type));
379    }
380
381    #[rstest]
382    fn test_get_order_snapshots_topic(mut switchboard: MessagingSwitchboard) {
383        let client_order_id = ClientOrderId::from("O-123456789");
384        let expected_topic = format!("order.snapshots.{client_order_id}").into();
385        let result = switchboard.get_order_snapshots_topic(client_order_id);
386        assert_eq!(result, expected_topic);
387        assert!(
388            switchboard
389                .order_snapshots_topics
390                .contains_key(&client_order_id)
391        );
392    }
393}