nautilus_common/msgbus/
mod.rs1mod api;
40pub mod core;
41pub mod database;
42pub mod matching;
43pub mod message;
44pub mod mstr;
45pub mod stubs;
46pub mod switchboard;
47pub mod typed_endpoints;
48pub mod typed_handler;
49pub mod typed_router;
50
51use std::{
52 cell::{OnceCell, RefCell},
53 rc::Rc,
54};
55
56#[cfg(feature = "defi")]
57use nautilus_model::defi::{Block, Pool, PoolFeeCollect, PoolFlash, PoolLiquidityUpdate, PoolSwap};
58use nautilus_model::{
59 data::{
60 Bar, FundingRateUpdate, GreeksData, IndexPriceUpdate, MarkPriceUpdate, OrderBookDeltas,
61 OrderBookDepth10, QuoteTick, TradeTick,
62 },
63 events::{AccountState, OrderEventAny, PositionEvent},
64 orderbook::OrderBook,
65};
66use smallvec::SmallVec;
67
68pub use self::{
69 api::*,
70 core::{MessageBus, Subscription},
71 message::BusMessage,
72 mstr::{Endpoint, MStr, Pattern, Topic},
73 switchboard::MessagingSwitchboard,
74 typed_endpoints::{EndpointMap, IntoEndpointMap},
75 typed_handler::{
76 CallbackHandler, Handler, IntoHandler, ShareableMessageHandler, TypedHandler,
77 TypedIntoHandler,
78 },
79 typed_router::{TopicRouter, TypedSubscription},
80};
81
82pub(super) const HANDLER_BUFFER_CAP: usize = 64;
84
85thread_local! {
94 pub(super) static MESSAGE_BUS: OnceCell<Rc<RefCell<MessageBus>>> = const { OnceCell::new() };
95
96 pub(super) static ANY_HANDLERS: RefCell<SmallVec<[ShareableMessageHandler; HANDLER_BUFFER_CAP]>> =
97 RefCell::new(SmallVec::new());
98
99 pub(super) static DELTAS_HANDLERS: RefCell<SmallVec<[TypedHandler<OrderBookDeltas>; HANDLER_BUFFER_CAP]>> =
100 RefCell::new(SmallVec::new());
101 pub(super) static DEPTH10_HANDLERS: RefCell<SmallVec<[TypedHandler<OrderBookDepth10>; HANDLER_BUFFER_CAP]>> =
102 RefCell::new(SmallVec::new());
103 pub(super) static BOOK_HANDLERS: RefCell<SmallVec<[TypedHandler<OrderBook>; HANDLER_BUFFER_CAP]>> =
104 RefCell::new(SmallVec::new());
105 pub(super) static QUOTE_HANDLERS: RefCell<SmallVec<[TypedHandler<QuoteTick>; HANDLER_BUFFER_CAP]>> =
106 RefCell::new(SmallVec::new());
107 pub(super) static TRADE_HANDLERS: RefCell<SmallVec<[TypedHandler<TradeTick>; HANDLER_BUFFER_CAP]>> =
108 RefCell::new(SmallVec::new());
109 pub(super) static BAR_HANDLERS: RefCell<SmallVec<[TypedHandler<Bar>; HANDLER_BUFFER_CAP]>> =
110 RefCell::new(SmallVec::new());
111 pub(super) static MARK_PRICE_HANDLERS: RefCell<SmallVec<[TypedHandler<MarkPriceUpdate>; HANDLER_BUFFER_CAP]>> =
112 RefCell::new(SmallVec::new());
113 pub(super) static INDEX_PRICE_HANDLERS: RefCell<SmallVec<[TypedHandler<IndexPriceUpdate>; HANDLER_BUFFER_CAP]>> =
114 RefCell::new(SmallVec::new());
115 pub(super) static FUNDING_RATE_HANDLERS: RefCell<SmallVec<[TypedHandler<FundingRateUpdate>; HANDLER_BUFFER_CAP]>> =
116 RefCell::new(SmallVec::new());
117 pub(super) static GREEKS_HANDLERS: RefCell<SmallVec<[TypedHandler<GreeksData>; HANDLER_BUFFER_CAP]>> =
118 RefCell::new(SmallVec::new());
119 pub(super) static ACCOUNT_STATE_HANDLERS: RefCell<SmallVec<[TypedHandler<AccountState>; HANDLER_BUFFER_CAP]>> =
120 RefCell::new(SmallVec::new());
121 pub(super) static ORDER_EVENT_HANDLERS: RefCell<SmallVec<[TypedHandler<OrderEventAny>; HANDLER_BUFFER_CAP]>> =
122 RefCell::new(SmallVec::new());
123 pub(super) static POSITION_EVENT_HANDLERS: RefCell<SmallVec<[TypedHandler<PositionEvent>; HANDLER_BUFFER_CAP]>> =
124 RefCell::new(SmallVec::new());
125
126 #[cfg(feature = "defi")]
127 pub(super) static DEFI_BLOCK_HANDLERS: RefCell<SmallVec<[TypedHandler<Block>; HANDLER_BUFFER_CAP]>> =
128 RefCell::new(SmallVec::new());
129 #[cfg(feature = "defi")]
130 pub(super) static DEFI_POOL_HANDLERS: RefCell<SmallVec<[TypedHandler<Pool>; HANDLER_BUFFER_CAP]>> =
131 RefCell::new(SmallVec::new());
132 #[cfg(feature = "defi")]
133 pub(super) static DEFI_SWAP_HANDLERS: RefCell<SmallVec<[TypedHandler<PoolSwap>; HANDLER_BUFFER_CAP]>> =
134 RefCell::new(SmallVec::new());
135 #[cfg(feature = "defi")]
136 pub(super) static DEFI_LIQUIDITY_HANDLERS: RefCell<SmallVec<[TypedHandler<PoolLiquidityUpdate>; HANDLER_BUFFER_CAP]>> =
137 RefCell::new(SmallVec::new());
138 #[cfg(feature = "defi")]
139 pub(super) static DEFI_COLLECT_HANDLERS: RefCell<SmallVec<[TypedHandler<PoolFeeCollect>; HANDLER_BUFFER_CAP]>> =
140 RefCell::new(SmallVec::new());
141 #[cfg(feature = "defi")]
142 pub(super) static DEFI_FLASH_HANDLERS: RefCell<SmallVec<[TypedHandler<PoolFlash>; HANDLER_BUFFER_CAP]>> =
143 RefCell::new(SmallVec::new());
144}
145
146pub fn set_message_bus(msgbus: Rc<RefCell<MessageBus>>) {
152 MESSAGE_BUS.with(|bus| {
153 assert!(
154 bus.set(msgbus).is_ok(),
155 "Failed to set MessageBus: already initialized for this thread"
156 );
157 });
158}
159
160pub fn get_message_bus() -> Rc<RefCell<MessageBus>> {
164 MESSAGE_BUS.with(|bus| {
165 bus.get_or_init(|| {
166 let msgbus = MessageBus::default();
167 Rc::new(RefCell::new(msgbus))
168 })
169 .clone()
170 })
171}