nautilus_common/msgbus/
mod.rs1pub mod core;
23pub mod database;
24pub mod handler;
25pub mod matching;
26pub mod message;
27pub mod stubs;
28pub mod switchboard;
29
30#[cfg(test)]
31mod tests;
32
33pub use core::{Endpoint, MStr, MessageBus, Pattern, Subscription, Topic};
34use std::{
35 self,
36 any::Any,
37 cell::{OnceCell, RefCell},
38 rc::Rc,
39};
40
41use handler::ShareableMessageHandler;
42use matching::is_matching_backtracking;
43use nautilus_core::UUID4;
44use nautilus_model::data::Data;
45use ustr::Ustr;
46
47use crate::messages::data::DataResponse;
48pub use crate::msgbus::message::BusMessage;
49
50thread_local! {
54 static MESSAGE_BUS: OnceCell<Rc<RefCell<MessageBus>>> = const { OnceCell::new() };
55}
56
57pub fn set_message_bus(msgbus: Rc<RefCell<MessageBus>>) {
63 MESSAGE_BUS.with(|bus| {
64 if bus.set(msgbus).is_err() {
65 panic!("Failed to set MessageBus: already initialized for this thread");
66 }
67 });
68}
69
70pub fn get_message_bus() -> Rc<RefCell<MessageBus>> {
76 MESSAGE_BUS.with(|bus| {
77 bus.get_or_init(|| {
78 let msgbus = MessageBus::default();
79 Rc::new(RefCell::new(msgbus))
80 })
81 .clone()
82 })
83}
84
85pub fn send_any(endpoint: MStr<Endpoint>, message: &dyn Any) {
87 let handler = get_message_bus().borrow().get_endpoint(endpoint).cloned();
88 if let Some(handler) = handler {
89 handler.0.handle(message);
90 } else {
91 log::error!("send_any: no registered endpoint '{endpoint}'");
92 }
93}
94
95pub fn send<T: 'static>(endpoint: MStr<Endpoint>, message: T) {
97 let handler = get_message_bus().borrow().get_endpoint(endpoint).cloned();
98 if let Some(handler) = handler {
99 handler.0.handle(&message);
100 } else {
101 log::error!("send: no registered endpoint '{endpoint}'");
102 }
103}
104
105pub fn send_response(correlation_id: &UUID4, message: &DataResponse) {
107 let handler = get_message_bus()
108 .borrow()
109 .get_response_handler(correlation_id)
110 .cloned();
111
112 if let Some(handler) = handler {
113 handler.0.handle(message);
114 } else {
115 log::error!("send_response: handler not found for correlation_id '{correlation_id}'");
116 }
117}
118
119pub fn publish_data(topic: &Ustr, message: Data) {
121 let matching_subs = get_message_bus()
122 .borrow_mut()
123 .matching_subscriptions(*topic);
124
125 for sub in matching_subs {
126 sub.handler.0.handle(&message);
127 }
128}
129
130pub fn response(correlation_id: &UUID4, message: &dyn Any) {
132 let handler = get_message_bus()
133 .borrow()
134 .get_response_handler(correlation_id)
135 .cloned();
136 if let Some(handler) = handler {
137 handler.0.handle(message);
138 } else {
139 log::error!("response: handler not found for correlation_id '{correlation_id}'");
140 }
141}
142
143pub fn register_response_handler(correlation_id: &UUID4, handler: ShareableMessageHandler) {
144 if let Err(e) = get_message_bus()
145 .borrow_mut()
146 .register_response_handler(correlation_id, handler)
147 {
148 log::error!("Failed to register request handler: {e}");
149 }
150}
151
152pub fn publish(topic: MStr<Topic>, message: &dyn Any) {
154 let matching_subs = get_message_bus()
155 .borrow_mut()
156 .inner_matching_subscriptions(topic);
157
158 for sub in matching_subs {
159 sub.handler.0.handle(message);
160 }
161}
162
163pub fn register(endpoint: MStr<Endpoint>, handler: ShareableMessageHandler) {
165 log::debug!(
166 "Registering endpoint '{endpoint}' with handler ID {}",
167 handler.0.id(),
168 );
169
170 get_message_bus()
172 .borrow_mut()
173 .endpoints
174 .insert(endpoint, handler);
175}
176
177pub fn deregister(endpoint: MStr<Endpoint>) {
179 log::debug!("Deregistering endpoint '{endpoint}'");
180
181 get_message_bus()
183 .borrow_mut()
184 .endpoints
185 .shift_remove(&endpoint);
186}
187
188pub fn subscribe(pattern: MStr<Pattern>, handler: ShareableMessageHandler, priority: Option<u8>) {
199 let msgbus = get_message_bus();
200 let mut msgbus_ref_mut = msgbus.borrow_mut();
201 let sub = Subscription::new(pattern, handler, priority);
202
203 log::debug!(
204 "Subscribing {:?} for pattern '{}'",
205 sub.handler,
206 sub.pattern
207 );
208
209 if msgbus_ref_mut.subscriptions.contains(&sub) {
213 log::warn!("{sub:?} already exists");
214 return;
215 }
216
217 for (topic, subs) in &mut msgbus_ref_mut.topics {
219 if is_matching_backtracking(*topic, sub.pattern) {
220 subs.push(sub.clone());
222 subs.sort();
223 log::debug!("Added subscription for '{topic}'");
224 }
225 }
226
227 msgbus_ref_mut.subscriptions.insert(sub);
228}
229
230pub fn subscribe_topic(topic: MStr<Topic>, handler: ShareableMessageHandler, priority: Option<u8>) {
231 subscribe(topic.into(), handler, priority);
232}
233
234pub fn subscribe_str<T: AsRef<str>>(
235 pattern: T,
236 handler: ShareableMessageHandler,
237 priority: Option<u8>,
238) {
239 subscribe(MStr::from(pattern.as_ref()), handler, priority);
240}
241
242pub fn unsubscribe(pattern: MStr<Pattern>, handler: ShareableMessageHandler) {
244 log::debug!("Unsubscribing {handler:?} from pattern '{pattern}'");
245
246 let sub = core::Subscription::new(pattern, handler, None);
247
248 get_message_bus()
249 .borrow_mut()
250 .topics
251 .values_mut()
252 .for_each(|subs| {
253 if let Ok(index) = subs.binary_search(&sub) {
254 subs.remove(index);
255 }
256 });
257
258 let removed = get_message_bus().borrow_mut().subscriptions.remove(&sub);
259
260 if removed {
261 log::debug!("Handler for pattern '{pattern}' was removed");
262 } else {
263 log::debug!("No matching handler for pattern '{pattern}' was found");
264 }
265}
266
267pub fn unsubscribe_topic(topic: MStr<Topic>, handler: ShareableMessageHandler) {
268 unsubscribe(topic.into(), handler);
269}
270
271pub fn unsubscribe_str<T: AsRef<str>>(pattern: T, handler: ShareableMessageHandler) {
272 unsubscribe(MStr::from(pattern.as_ref()), handler);
273}
274
275pub fn is_subscribed<T: AsRef<str>>(pattern: T, handler: ShareableMessageHandler) -> bool {
276 let pattern = MStr::from(pattern.as_ref());
277 let sub = Subscription::new(pattern, handler, None);
278 get_message_bus().borrow().subscriptions.contains(&sub)
279}
280
281pub fn subscriptions_count<T: AsRef<str>>(topic: T) -> usize {
282 get_message_bus().borrow().subscriptions_count(topic)
283}