nautilus_common/msgbus/
switchboard.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{num::NonZeroUsize, sync::OnceLock};
17
18use ahash::AHashMap;
19use nautilus_model::{
20    data::{BarType, DataType},
21    identifiers::{ClientOrderId, InstrumentId, PositionId, StrategyId, Venue},
22};
23
24use super::core::{Endpoint, MStr, Topic};
25use crate::msgbus::get_message_bus;
26
27pub const CLOSE_TOPIC: &str = "CLOSE";
28
29////////////////////////////////////////////////////////////////////////////////
30// Built-in endpoint constants
31////////////////////////////////////////////////////////////////////////////////
32// These are static endpoint addresses.
33// They use OnceLock for thread-safe lazy initialization without instance state.
34
35static DATA_QUEUE_EXECUTE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
36static DATA_EXECUTE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
37static DATA_PROCESS_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
38static DATA_RESPONSE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
39static EXEC_EXECUTE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
40static EXEC_PROCESS_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
41static EXEC_RECONCILE_REPORT_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
42static EXEC_RECONCILE_MASS_STATUS_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
43static RISK_EXECUTE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
44static RISK_PROCESS_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
45static PORTFOLIO_UPDATE_ACCOUNT_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
46
47macro_rules! define_switchboard {
48    ($(
49        $field:ident: $key_ty:ty,
50        $method:ident($($arg_name:ident: $arg_ty:ty),*) -> $key_expr:expr,
51        $val_fmt:expr,
52        $($val_args:expr),*
53    );* $(;)?) => {
54        /// Represents a switchboard of built-in messaging endpoint names.
55        #[derive(Clone, Debug)]
56        pub struct MessagingSwitchboard {
57            $(
58                $field: AHashMap<$key_ty, MStr<Topic>>,
59            )*
60            #[cfg(feature = "defi")]
61            pub(crate) defi: crate::defi::switchboard::DefiSwitchboard,
62        }
63
64        impl Default for MessagingSwitchboard {
65            /// Creates a new default [`MessagingSwitchboard`] instance.
66            fn default() -> Self {
67                Self {
68                    $(
69                        $field: AHashMap::new(),
70                    )*
71                    #[cfg(feature = "defi")]
72                    defi: crate::defi::switchboard::DefiSwitchboard::default(),
73                }
74            }
75        }
76
77        impl MessagingSwitchboard {
78            // Static endpoints
79            #[inline]
80            #[must_use]
81            pub fn data_engine_queue_execute() -> MStr<Endpoint> {
82                *DATA_QUEUE_EXECUTE_ENDPOINT.get_or_init(|| "DataEngine.queue_execute".into())
83            }
84
85            #[inline]
86            #[must_use]
87            pub fn data_engine_execute() -> MStr<Endpoint> {
88                *DATA_EXECUTE_ENDPOINT.get_or_init(|| "DataEngine.execute".into())
89            }
90
91            #[inline]
92            #[must_use]
93            pub fn data_engine_process() -> MStr<Endpoint> {
94                *DATA_PROCESS_ENDPOINT.get_or_init(|| "DataEngine.process".into())
95            }
96
97            #[inline]
98            #[must_use]
99            pub fn data_engine_response() -> MStr<Endpoint> {
100                *DATA_RESPONSE_ENDPOINT.get_or_init(|| "DataEngine.response".into())
101            }
102
103            #[inline]
104            #[must_use]
105            pub fn exec_engine_execute() -> MStr<Endpoint> {
106                *EXEC_EXECUTE_ENDPOINT.get_or_init(|| "ExecEngine.execute".into())
107            }
108
109            #[inline]
110            #[must_use]
111            pub fn exec_engine_process() -> MStr<Endpoint> {
112                *EXEC_PROCESS_ENDPOINT.get_or_init(|| "ExecEngine.process".into())
113            }
114
115            #[inline]
116            #[must_use]
117            pub fn exec_engine_reconcile_execution_report() -> MStr<Endpoint> {
118                *EXEC_RECONCILE_REPORT_ENDPOINT.get_or_init(|| "ExecEngine.reconcile_execution_report".into())
119            }
120
121            #[inline]
122            #[must_use]
123            pub fn exec_engine_reconcile_execution_mass_status() -> MStr<Endpoint> {
124                *EXEC_RECONCILE_MASS_STATUS_ENDPOINT
125                    .get_or_init(|| "ExecEngine.reconcile_execution_mass_status".into())
126            }
127
128            #[inline]
129            #[must_use]
130            pub fn risk_engine_execute() -> MStr<Endpoint> {
131                *RISK_EXECUTE_ENDPOINT.get_or_init(|| "RiskEngine.execute".into())
132            }
133
134            #[inline]
135            #[must_use]
136            pub fn risk_engine_process() -> MStr<Endpoint> {
137                *RISK_PROCESS_ENDPOINT.get_or_init(|| "RiskEngine.process".into())
138            }
139
140            #[inline]
141            #[must_use]
142            pub fn portfolio_update_account() -> MStr<Endpoint> {
143                *PORTFOLIO_UPDATE_ACCOUNT_ENDPOINT.get_or_init(|| "Portfolio.update_account".into())
144            }
145
146            // Dynamic topics
147            $(
148                #[must_use]
149                pub fn $method(&mut self, $($arg_name: $arg_ty),*) -> MStr<Topic> {
150                    let key = $key_expr;
151                    *self.$field
152                        .entry(key)
153                        .or_insert_with(|| format!($val_fmt, $($val_args),*).into())
154                }
155            )*
156        }
157    };
158}
159
160define_switchboard! {
161    custom_topics: DataType,
162    get_custom_topic(data_type: &DataType) -> data_type.clone(),
163    "data.{}", data_type.topic();
164
165    instruments_topics: Venue,
166    get_instruments_topic(venue: Venue) -> venue,
167    "data.instrument.{}", venue;
168
169    instrument_topics: InstrumentId,
170    get_instrument_topic(instrument_id: InstrumentId) -> instrument_id,
171    "data.instrument.{}.{}", instrument_id.venue, instrument_id.symbol;
172
173    book_deltas_topics: InstrumentId,
174    get_book_deltas_topic(instrument_id: InstrumentId) -> instrument_id,
175    "data.book.deltas.{}.{}", instrument_id.venue, instrument_id.symbol;
176
177    book_depth10_topics: InstrumentId,
178    get_book_depth10_topic(instrument_id: InstrumentId) -> instrument_id,
179    "data.book.depth10.{}.{}", instrument_id.venue, instrument_id.symbol;
180
181    book_snapshots_topics: (InstrumentId, NonZeroUsize),
182    get_book_snapshots_topic(instrument_id: InstrumentId, interval_ms: NonZeroUsize) -> (instrument_id, interval_ms),
183    "data.book.snapshots.{}.{}.{}", instrument_id.venue, instrument_id.symbol, interval_ms;
184
185    quote_topics: InstrumentId,
186    get_quotes_topic(instrument_id: InstrumentId) -> instrument_id,
187    "data.quotes.{}.{}", instrument_id.venue, instrument_id.symbol;
188
189    trade_topics: InstrumentId,
190    get_trades_topic(instrument_id: InstrumentId) -> instrument_id,
191    "data.trades.{}.{}", instrument_id.venue, instrument_id.symbol;
192
193    bar_topics: BarType,
194    get_bars_topic(bar_type: BarType) -> bar_type,
195    "data.bars.{}", bar_type;
196
197    mark_price_topics: InstrumentId,
198    get_mark_price_topic(instrument_id: InstrumentId) -> instrument_id,
199    "data.mark_prices.{}.{}", instrument_id.venue, instrument_id.symbol;
200
201    index_price_topics: InstrumentId,
202    get_index_price_topic(instrument_id: InstrumentId) -> instrument_id,
203    "data.index_prices.{}.{}", instrument_id.venue, instrument_id.symbol;
204
205    funding_rate_topics: InstrumentId,
206    get_funding_rate_topic(instrument_id: InstrumentId) -> instrument_id,
207    "data.funding_rates.{}.{}", instrument_id.venue, instrument_id.symbol;
208
209    instrument_status_topics: InstrumentId,
210    get_instrument_status_topic(instrument_id: InstrumentId) -> instrument_id,
211    "data.status.{}.{}", instrument_id.venue, instrument_id.symbol;
212
213    instrument_close_topics: InstrumentId,
214    get_instrument_close_topic(instrument_id: InstrumentId) -> instrument_id,
215    "data.close.{}.{}", instrument_id.venue, instrument_id.symbol;
216
217    order_fills_topics: InstrumentId,
218    get_order_fills_topic(instrument_id: InstrumentId) -> instrument_id,
219    "events.fills.{}", instrument_id;
220
221    order_cancels_topics: InstrumentId,
222    get_order_cancels_topic(instrument_id: InstrumentId) -> instrument_id,
223    "events.cancels.{}", instrument_id;
224
225    order_snapshots_topics: ClientOrderId,
226    get_order_snapshots_topic(client_order_id: ClientOrderId) -> client_order_id,
227    "order.snapshots.{}", client_order_id;
228
229    positions_snapshots_topics: PositionId,
230    get_positions_snapshots_topic(position_id: PositionId) -> position_id,
231    "positions.snapshots.{}", position_id;
232
233    event_orders_topics: StrategyId,
234    get_event_orders_topic(strategy_id: StrategyId) -> strategy_id,
235    "events.order.{}", strategy_id;
236
237    event_positions_topics: StrategyId,
238    get_event_positions_topic(strategy_id: StrategyId) -> strategy_id,
239    "events.position.{}", strategy_id;
240}
241
242////////////////////////////////////////////////////////////////////////////////
243// Topic wrapper functions
244////////////////////////////////////////////////////////////////////////////////
245// These wrapper functions provide convenient access to switchboard topic methods
246// by accessing the thread-local message bus instance.
247
248macro_rules! define_wrappers {
249    ($($method:ident($($arg_name:ident: $arg_ty:ty),*) -> $ret:ty),* $(,)?) => {
250        $(
251            #[must_use]
252            pub fn $method($($arg_name: $arg_ty),*) -> $ret {
253                get_message_bus()
254                    .borrow_mut()
255                    .switchboard
256                    .$method($($arg_name),*)
257            }
258        )*
259    }
260}
261
262define_wrappers! {
263    get_custom_topic(data_type: &DataType) -> MStr<Topic>,
264    get_instruments_topic(venue: Venue) -> MStr<Topic>,
265    get_instrument_topic(instrument_id: InstrumentId) -> MStr<Topic>,
266    get_book_deltas_topic(instrument_id: InstrumentId) -> MStr<Topic>,
267    get_book_depth10_topic(instrument_id: InstrumentId) -> MStr<Topic>,
268    get_book_snapshots_topic(instrument_id: InstrumentId, interval_ms: NonZeroUsize) -> MStr<Topic>,
269    get_quotes_topic(instrument_id: InstrumentId) -> MStr<Topic>,
270    get_trades_topic(instrument_id: InstrumentId) -> MStr<Topic>,
271    get_bars_topic(bar_type: BarType) -> MStr<Topic>,
272    get_mark_price_topic(instrument_id: InstrumentId) -> MStr<Topic>,
273    get_index_price_topic(instrument_id: InstrumentId) -> MStr<Topic>,
274    get_funding_rate_topic(instrument_id: InstrumentId) -> MStr<Topic>,
275    get_instrument_status_topic(instrument_id: InstrumentId) -> MStr<Topic>,
276    get_instrument_close_topic(instrument_id: InstrumentId) -> MStr<Topic>,
277    get_order_fills_topic(instrument_id: InstrumentId) -> MStr<Topic>,
278    get_order_cancels_topic(instrument_id: InstrumentId) -> MStr<Topic>,
279    get_order_snapshots_topic(client_order_id: ClientOrderId) -> MStr<Topic>,
280    get_positions_snapshots_topic(position_id: PositionId) -> MStr<Topic>,
281    get_event_orders_topic(strategy_id: StrategyId) -> MStr<Topic>,
282    get_event_positions_topic(strategy_id: StrategyId) -> MStr<Topic>,
283}
284
285#[cfg(test)]
286mod tests {
287    use nautilus_model::{
288        data::{BarType, DataType},
289        identifiers::InstrumentId,
290    };
291    use rstest::*;
292
293    use super::*;
294
295    #[fixture]
296    fn switchboard() -> MessagingSwitchboard {
297        MessagingSwitchboard::default()
298    }
299
300    #[fixture]
301    fn instrument_id() -> InstrumentId {
302        InstrumentId::from("ESZ24.XCME")
303    }
304
305    #[rstest]
306    fn test_get_custom_topic(mut switchboard: MessagingSwitchboard) {
307        let data_type = DataType::new("ExampleDataType", None);
308        let expected_topic = "data.ExampleDataType".into();
309        let result = switchboard.get_custom_topic(&data_type);
310        assert_eq!(result, expected_topic);
311        assert!(switchboard.custom_topics.contains_key(&data_type));
312    }
313
314    #[rstest]
315    fn test_get_instrument_topic(
316        mut switchboard: MessagingSwitchboard,
317        instrument_id: InstrumentId,
318    ) {
319        let expected_topic = "data.instrument.XCME.ESZ24".into();
320        let result = switchboard.get_instrument_topic(instrument_id);
321        assert_eq!(result, expected_topic);
322        assert!(switchboard.instrument_topics.contains_key(&instrument_id));
323    }
324
325    #[rstest]
326    fn test_get_book_deltas_topic(
327        mut switchboard: MessagingSwitchboard,
328        instrument_id: InstrumentId,
329    ) {
330        let expected_topic = "data.book.deltas.XCME.ESZ24".into();
331        let result = switchboard.get_book_deltas_topic(instrument_id);
332        assert_eq!(result, expected_topic);
333        assert!(switchboard.book_deltas_topics.contains_key(&instrument_id));
334    }
335
336    #[rstest]
337    fn test_get_book_depth10_topic(
338        mut switchboard: MessagingSwitchboard,
339        instrument_id: InstrumentId,
340    ) {
341        let expected_topic = "data.book.depth10.XCME.ESZ24".into();
342        let result = switchboard.get_book_depth10_topic(instrument_id);
343        assert_eq!(result, expected_topic);
344        assert!(switchboard.book_depth10_topics.contains_key(&instrument_id));
345    }
346
347    #[rstest]
348    fn test_get_book_snapshots_topic(
349        mut switchboard: MessagingSwitchboard,
350        instrument_id: InstrumentId,
351    ) {
352        let expected_topic = "data.book.snapshots.XCME.ESZ24.1000".into();
353        let interval_ms = NonZeroUsize::new(1000).unwrap();
354        let result = switchboard.get_book_snapshots_topic(instrument_id, interval_ms);
355        assert_eq!(result, expected_topic);
356
357        assert!(
358            switchboard
359                .book_snapshots_topics
360                .contains_key(&(instrument_id, interval_ms))
361        );
362    }
363
364    #[rstest]
365    fn test_get_quotes_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
366        let expected_topic = "data.quotes.XCME.ESZ24".into();
367        let result = switchboard.get_quotes_topic(instrument_id);
368        assert_eq!(result, expected_topic);
369        assert!(switchboard.quote_topics.contains_key(&instrument_id));
370    }
371
372    #[rstest]
373    fn test_get_trades_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
374        let expected_topic = "data.trades.XCME.ESZ24".into();
375        let result = switchboard.get_trades_topic(instrument_id);
376        assert_eq!(result, expected_topic);
377        assert!(switchboard.trade_topics.contains_key(&instrument_id));
378    }
379
380    #[rstest]
381    fn test_get_bars_topic(mut switchboard: MessagingSwitchboard) {
382        let bar_type = BarType::from("ESZ24.XCME-1-MINUTE-LAST-INTERNAL");
383        let expected_topic = format!("data.bars.{bar_type}").into();
384        let result = switchboard.get_bars_topic(bar_type);
385        assert_eq!(result, expected_topic);
386        assert!(switchboard.bar_topics.contains_key(&bar_type));
387    }
388
389    #[rstest]
390    fn test_get_order_snapshots_topic(mut switchboard: MessagingSwitchboard) {
391        let client_order_id = ClientOrderId::from("O-123456789");
392        let expected_topic = format!("order.snapshots.{client_order_id}").into();
393        let result = switchboard.get_order_snapshots_topic(client_order_id);
394        assert_eq!(result, expected_topic);
395        assert!(
396            switchboard
397                .order_snapshots_topics
398                .contains_key(&client_order_id)
399        );
400    }
401}