nautilus_common/msgbus/
mod.rs1pub mod core;
23pub mod database;
24pub mod handler;
25pub mod matching;
26pub mod message;
27pub mod stubs;
28pub mod switchboard;
29
30#[cfg(test)]
31mod tests;
32
33pub use core::{Endpoint, MStr, MessageBus, Pattern, Subscription, Topic};
34use std::{
35 self,
36 any::Any,
37 cell::{OnceCell, RefCell},
38 rc::Rc,
39};
40
41use handler::ShareableMessageHandler;
42use matching::is_matching_backtracking;
43use nautilus_core::UUID4;
44use nautilus_model::data::Data;
45use ustr::Ustr;
46
47use crate::messages::data::DataResponse;
48pub use crate::msgbus::message::BusMessage;
49
50thread_local! {
54 static MESSAGE_BUS: OnceCell<Rc<RefCell<MessageBus>>> = const { OnceCell::new() };
55}
56
57pub fn set_message_bus(msgbus: Rc<RefCell<MessageBus>>) {
63 MESSAGE_BUS.with(|bus| {
64 assert!(
65 bus.set(msgbus).is_ok(),
66 "Failed to set MessageBus: already initialized for this thread"
67 );
68 });
69}
70
71pub fn get_message_bus() -> Rc<RefCell<MessageBus>> {
77 MESSAGE_BUS.with(|bus| {
78 bus.get_or_init(|| {
79 let msgbus = MessageBus::default();
80 Rc::new(RefCell::new(msgbus))
81 })
82 .clone()
83 })
84}
85
86pub fn send_any(endpoint: MStr<Endpoint>, message: &dyn Any) {
88 let handler = get_message_bus().borrow().get_endpoint(endpoint).cloned();
89 if let Some(handler) = handler {
90 handler.0.handle(message);
91 } else {
92 log::error!("send_any: no registered endpoint '{endpoint}'");
93 }
94}
95
96pub fn send<T: 'static>(endpoint: MStr<Endpoint>, message: T) {
98 let handler = get_message_bus().borrow().get_endpoint(endpoint).cloned();
99 if let Some(handler) = handler {
100 handler.0.handle(&message);
101 } else {
102 log::error!("send: no registered endpoint '{endpoint}'");
103 }
104}
105
106pub fn send_response(correlation_id: &UUID4, message: &DataResponse) {
108 let handler = get_message_bus()
109 .borrow()
110 .get_response_handler(correlation_id)
111 .cloned();
112
113 if let Some(handler) = handler {
114 match message {
115 DataResponse::Data(resp) => handler.0.handle(resp),
116 DataResponse::Instrument(resp) => handler.0.handle(resp.as_ref()),
117 DataResponse::Instruments(resp) => handler.0.handle(resp),
118 DataResponse::Book(resp) => handler.0.handle(resp),
119 DataResponse::Quotes(resp) => handler.0.handle(resp),
120 DataResponse::Trades(resp) => handler.0.handle(resp),
121 DataResponse::Bars(resp) => handler.0.handle(resp),
122 }
123 } else {
124 log::error!("send_response: handler not found for correlation_id '{correlation_id}'");
125 }
126}
127
128pub fn publish_data(topic: &Ustr, message: Data) {
130 let matching_subs = get_message_bus()
131 .borrow_mut()
132 .matching_subscriptions(*topic);
133
134 for sub in matching_subs {
135 sub.handler.0.handle(&message);
136 }
137}
138
139pub fn register_response_handler(correlation_id: &UUID4, handler: ShareableMessageHandler) {
140 if let Err(e) = get_message_bus()
141 .borrow_mut()
142 .register_response_handler(correlation_id, handler)
143 {
144 log::error!("Failed to register request handler: {e}");
145 }
146}
147
148pub fn publish(topic: MStr<Topic>, message: &dyn Any) {
150 let matching_subs = get_message_bus()
151 .borrow_mut()
152 .inner_matching_subscriptions(topic);
153
154 for sub in matching_subs {
155 sub.handler.0.handle(message);
156 }
157}
158
159pub fn register(endpoint: MStr<Endpoint>, handler: ShareableMessageHandler) {
161 log::debug!(
162 "Registering endpoint '{endpoint}' with handler ID {}",
163 handler.0.id(),
164 );
165
166 get_message_bus()
168 .borrow_mut()
169 .endpoints
170 .insert(endpoint, handler);
171}
172
173pub fn deregister(endpoint: MStr<Endpoint>) {
175 log::debug!("Deregistering endpoint '{endpoint}'");
176
177 get_message_bus()
179 .borrow_mut()
180 .endpoints
181 .shift_remove(&endpoint);
182}
183
184pub fn subscribe(pattern: MStr<Pattern>, handler: ShareableMessageHandler, priority: Option<u8>) {
195 let msgbus = get_message_bus();
196 let mut msgbus_ref_mut = msgbus.borrow_mut();
197 let sub = Subscription::new(pattern, handler, priority);
198
199 log::debug!(
200 "Subscribing {:?} for pattern '{}'",
201 sub.handler,
202 sub.pattern
203 );
204
205 if msgbus_ref_mut.subscriptions.contains(&sub) {
209 log::warn!("{sub:?} already exists");
210 return;
211 }
212
213 for (topic, subs) in &mut msgbus_ref_mut.topics {
215 if is_matching_backtracking(*topic, sub.pattern) {
216 subs.push(sub.clone());
218 subs.sort();
219 log::debug!("Added subscription for '{topic}'");
220 }
221 }
222
223 msgbus_ref_mut.subscriptions.insert(sub);
224}
225
226pub fn subscribe_topic(topic: MStr<Topic>, handler: ShareableMessageHandler, priority: Option<u8>) {
227 subscribe(topic.into(), handler, priority);
228}
229
230pub fn subscribe_str<T: AsRef<str>>(
231 pattern: T,
232 handler: ShareableMessageHandler,
233 priority: Option<u8>,
234) {
235 subscribe(MStr::from(pattern.as_ref()), handler, priority);
236}
237
238pub fn unsubscribe(pattern: MStr<Pattern>, handler: ShareableMessageHandler) {
240 log::debug!("Unsubscribing {handler:?} from pattern '{pattern}'");
241
242 let sub = core::Subscription::new(pattern, handler, None);
243
244 get_message_bus()
245 .borrow_mut()
246 .topics
247 .values_mut()
248 .for_each(|subs| {
249 if let Ok(index) = subs.binary_search(&sub) {
250 subs.remove(index);
251 }
252 });
253
254 let removed = get_message_bus().borrow_mut().subscriptions.remove(&sub);
255
256 if removed {
257 log::debug!("Handler for pattern '{pattern}' was removed");
258 } else {
259 log::debug!("No matching handler for pattern '{pattern}' was found");
260 }
261}
262
263pub fn unsubscribe_topic(topic: MStr<Topic>, handler: ShareableMessageHandler) {
264 unsubscribe(topic.into(), handler);
265}
266
267pub fn unsubscribe_str<T: AsRef<str>>(pattern: T, handler: ShareableMessageHandler) {
268 unsubscribe(MStr::from(pattern.as_ref()), handler);
269}
270
271pub fn is_subscribed<T: AsRef<str>>(pattern: T, handler: ShareableMessageHandler) -> bool {
272 let pattern = MStr::from(pattern.as_ref());
273 let sub = Subscription::new(pattern, handler, None);
274 get_message_bus().borrow().subscriptions.contains(&sub)
275}
276
277pub fn subscriptions_count<T: AsRef<str>>(topic: T) -> usize {
278 get_message_bus().borrow().subscriptions_count(topic)
279}