Skip to main content

nautilus_common/msgbus/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! In-memory message bus for intra-process communication.
17//!
18//! # Messaging patterns
19//!
20//! - **Point-to-point**: Send messages to named endpoints via `send_*` functions.
21//! - **Pub/sub**: Publish messages to topics via `publish_*`, subscribers receive
22//!   all messages matching their pattern.
23//! - **Request/response**: Register correlation IDs for response sequence tracking.
24//!
25//! # Architecture
26//!
27//! The bus uses thread-local storage for single-threaded async runtimes. Each
28//! thread gets its own `MessageBus` instance, avoiding synchronization overhead.
29//!
30//! Two routing mechanisms serve different needs:
31//!
32//! - **Typed routing** (`publish_quote`, `subscribe_quotes`): Zero-cost dispatch
33//!   for known types. Handlers receive `&T` directly with no runtime type checking.
34//! - **Any-based routing** (`publish_any`, `subscribe_any`): Flexible dispatch for
35//!   custom types and Python interop. Handlers receive `&dyn Any`.
36//!
37//! See [`core`] module documentation for design decisions and performance details.
38
39mod api;
40pub mod core;
41pub mod database;
42pub mod matching;
43pub mod message;
44pub mod mstr;
45pub mod stubs;
46pub mod switchboard;
47pub mod typed_endpoints;
48pub mod typed_handler;
49pub mod typed_router;
50
51use std::{
52    cell::{OnceCell, RefCell},
53    rc::Rc,
54};
55
56#[cfg(feature = "defi")]
57use nautilus_model::defi::{Block, Pool, PoolFeeCollect, PoolFlash, PoolLiquidityUpdate, PoolSwap};
58use nautilus_model::{
59    data::{
60        Bar, FundingRateUpdate, GreeksData, IndexPriceUpdate, MarkPriceUpdate, OrderBookDeltas,
61        OrderBookDepth10, QuoteTick, TradeTick,
62    },
63    events::{AccountState, OrderEventAny, PositionEvent},
64    orderbook::OrderBook,
65};
66use smallvec::SmallVec;
67
68pub use self::{
69    api::*,
70    core::{MessageBus, Subscription},
71    message::BusMessage,
72    mstr::{Endpoint, MStr, Pattern, Topic},
73    switchboard::MessagingSwitchboard,
74    typed_endpoints::{EndpointMap, IntoEndpointMap},
75    typed_handler::{
76        CallbackHandler, Handler, IntoHandler, ShareableMessageHandler, TypedHandler,
77        TypedIntoHandler,
78    },
79    typed_router::{TopicRouter, TypedSubscription},
80};
81
82/// Inline capacity for handler buffers before heap allocation.
83pub(super) const HANDLER_BUFFER_CAP: usize = 64;
84
85// MessageBus is designed for single-threaded use within each async runtime.
86// Thread-local storage ensures each thread gets its own instance, eliminating
87// the need for unsafe Send/Sync implementations.
88//
89// Handler buffers provide zero-allocation publish on hot paths.
90// Each buffer stores up to 64 handlers inline before spilling to heap.
91// Publish functions use move-out/move-back to avoid holding RefCell borrows
92// during handler calls (enabling re-entrant publishes).
93thread_local! {
94    pub(super) static MESSAGE_BUS: OnceCell<Rc<RefCell<MessageBus>>> = const { OnceCell::new() };
95
96    pub(super) static ANY_HANDLERS: RefCell<SmallVec<[ShareableMessageHandler; HANDLER_BUFFER_CAP]>> =
97        RefCell::new(SmallVec::new());
98
99    pub(super) static DELTAS_HANDLERS: RefCell<SmallVec<[TypedHandler<OrderBookDeltas>; HANDLER_BUFFER_CAP]>> =
100        RefCell::new(SmallVec::new());
101    pub(super) static DEPTH10_HANDLERS: RefCell<SmallVec<[TypedHandler<OrderBookDepth10>; HANDLER_BUFFER_CAP]>> =
102        RefCell::new(SmallVec::new());
103    pub(super) static BOOK_HANDLERS: RefCell<SmallVec<[TypedHandler<OrderBook>; HANDLER_BUFFER_CAP]>> =
104        RefCell::new(SmallVec::new());
105    pub(super) static QUOTE_HANDLERS: RefCell<SmallVec<[TypedHandler<QuoteTick>; HANDLER_BUFFER_CAP]>> =
106        RefCell::new(SmallVec::new());
107    pub(super) static TRADE_HANDLERS: RefCell<SmallVec<[TypedHandler<TradeTick>; HANDLER_BUFFER_CAP]>> =
108        RefCell::new(SmallVec::new());
109    pub(super) static BAR_HANDLERS: RefCell<SmallVec<[TypedHandler<Bar>; HANDLER_BUFFER_CAP]>> =
110        RefCell::new(SmallVec::new());
111    pub(super) static MARK_PRICE_HANDLERS: RefCell<SmallVec<[TypedHandler<MarkPriceUpdate>; HANDLER_BUFFER_CAP]>> =
112        RefCell::new(SmallVec::new());
113    pub(super) static INDEX_PRICE_HANDLERS: RefCell<SmallVec<[TypedHandler<IndexPriceUpdate>; HANDLER_BUFFER_CAP]>> =
114        RefCell::new(SmallVec::new());
115    pub(super) static FUNDING_RATE_HANDLERS: RefCell<SmallVec<[TypedHandler<FundingRateUpdate>; HANDLER_BUFFER_CAP]>> =
116        RefCell::new(SmallVec::new());
117    pub(super) static GREEKS_HANDLERS: RefCell<SmallVec<[TypedHandler<GreeksData>; HANDLER_BUFFER_CAP]>> =
118        RefCell::new(SmallVec::new());
119    pub(super) static ACCOUNT_STATE_HANDLERS: RefCell<SmallVec<[TypedHandler<AccountState>; HANDLER_BUFFER_CAP]>> =
120        RefCell::new(SmallVec::new());
121    pub(super) static ORDER_EVENT_HANDLERS: RefCell<SmallVec<[TypedHandler<OrderEventAny>; HANDLER_BUFFER_CAP]>> =
122        RefCell::new(SmallVec::new());
123    pub(super) static POSITION_EVENT_HANDLERS: RefCell<SmallVec<[TypedHandler<PositionEvent>; HANDLER_BUFFER_CAP]>> =
124        RefCell::new(SmallVec::new());
125
126    #[cfg(feature = "defi")]
127    pub(super) static DEFI_BLOCK_HANDLERS: RefCell<SmallVec<[TypedHandler<Block>; HANDLER_BUFFER_CAP]>> =
128        RefCell::new(SmallVec::new());
129    #[cfg(feature = "defi")]
130    pub(super) static DEFI_POOL_HANDLERS: RefCell<SmallVec<[TypedHandler<Pool>; HANDLER_BUFFER_CAP]>> =
131        RefCell::new(SmallVec::new());
132    #[cfg(feature = "defi")]
133    pub(super) static DEFI_SWAP_HANDLERS: RefCell<SmallVec<[TypedHandler<PoolSwap>; HANDLER_BUFFER_CAP]>> =
134        RefCell::new(SmallVec::new());
135    #[cfg(feature = "defi")]
136    pub(super) static DEFI_LIQUIDITY_HANDLERS: RefCell<SmallVec<[TypedHandler<PoolLiquidityUpdate>; HANDLER_BUFFER_CAP]>> =
137        RefCell::new(SmallVec::new());
138    #[cfg(feature = "defi")]
139    pub(super) static DEFI_COLLECT_HANDLERS: RefCell<SmallVec<[TypedHandler<PoolFeeCollect>; HANDLER_BUFFER_CAP]>> =
140        RefCell::new(SmallVec::new());
141    #[cfg(feature = "defi")]
142    pub(super) static DEFI_FLASH_HANDLERS: RefCell<SmallVec<[TypedHandler<PoolFlash>; HANDLER_BUFFER_CAP]>> =
143        RefCell::new(SmallVec::new());
144}
145
146/// Sets the thread-local message bus.
147///
148/// # Panics
149///
150/// Panics if a message bus has already been set for this thread.
151pub fn set_message_bus(msgbus: Rc<RefCell<MessageBus>>) {
152    MESSAGE_BUS.with(|bus| {
153        assert!(
154            bus.set(msgbus).is_ok(),
155            "Failed to set MessageBus: already initialized for this thread"
156        );
157    });
158}
159
160/// Gets the thread-local message bus.
161///
162/// If no message bus has been set for this thread, a default one is created and initialized.
163pub fn get_message_bus() -> Rc<RefCell<MessageBus>> {
164    MESSAGE_BUS.with(|bus| {
165        bus.get_or_init(|| {
166            let msgbus = MessageBus::default();
167            Rc::new(RefCell::new(msgbus))
168        })
169        .clone()
170    })
171}