nautilus_common/msgbus/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! A common in-memory `MessageBus` supporting multiple messaging patterns:
17//!
18//! - Point-to-Point
19//! - Pub/Sub
20//! - Request/Response
21
22pub mod core;
23pub mod database;
24pub mod handler;
25pub mod matching;
26pub mod message;
27pub mod stubs;
28pub mod switchboard;
29
30#[cfg(test)]
31mod tests;
32
33pub use core::{Endpoint, MStr, MessageBus, Pattern, Subscription, Topic};
34use std::{
35    self,
36    any::Any,
37    cell::{OnceCell, RefCell},
38    rc::Rc,
39};
40
41use handler::ShareableMessageHandler;
42use matching::is_matching_backtracking;
43use nautilus_core::UUID4;
44use nautilus_model::data::Data;
45use ustr::Ustr;
46
47use crate::messages::data::DataResponse;
48pub use crate::msgbus::message::BusMessage;
49
50// Thread-local storage for MessageBus instances. Each thread (including async runtimes)
51// gets its own MessageBus instance, eliminating the need for unsafe Send/Sync implementations
52// while maintaining the global singleton access pattern that the framework expects.
53thread_local! {
54    static MESSAGE_BUS: OnceCell<Rc<RefCell<MessageBus>>> = const { OnceCell::new() };
55}
56
57/// Sets the thread-local message bus.
58///
59/// # Panics
60///
61/// Panics if a message bus has already been set for this thread.
62pub fn set_message_bus(msgbus: Rc<RefCell<MessageBus>>) {
63    MESSAGE_BUS.with(|bus| {
64        if bus.set(msgbus).is_err() {
65            panic!("Failed to set MessageBus: already initialized for this thread");
66        }
67    });
68}
69
70/// Gets the thread-local message bus.
71///
72/// If no message bus has been set for this thread, a default one is created and initialized.
73/// This ensures each thread gets its own MessageBus instance, preventing data races while
74/// maintaining the singleton pattern that the codebase expects.
75pub fn get_message_bus() -> Rc<RefCell<MessageBus>> {
76    MESSAGE_BUS.with(|bus| {
77        bus.get_or_init(|| {
78            let msgbus = MessageBus::default();
79            Rc::new(RefCell::new(msgbus))
80        })
81        .clone()
82    })
83}
84
85/// Sends the `message` to the `endpoint`.
86pub fn send_any(endpoint: MStr<Endpoint>, message: &dyn Any) {
87    let handler = get_message_bus().borrow().get_endpoint(endpoint).cloned();
88    if let Some(handler) = handler {
89        handler.0.handle(message);
90    } else {
91        log::error!("send_any: no registered endpoint '{endpoint}'");
92    }
93}
94
95/// Sends the `message` to the `endpoint`.
96pub fn send<T: 'static>(endpoint: MStr<Endpoint>, message: T) {
97    let handler = get_message_bus().borrow().get_endpoint(endpoint).cloned();
98    if let Some(handler) = handler {
99        handler.0.handle(&message);
100    } else {
101        log::error!("send: no registered endpoint '{endpoint}'");
102    }
103}
104
105/// Sends the [`DataResponse`] to the registered correlation ID handler.
106pub fn send_response(correlation_id: &UUID4, message: &DataResponse) {
107    let handler = get_message_bus()
108        .borrow()
109        .get_response_handler(correlation_id)
110        .cloned();
111
112    if let Some(handler) = handler {
113        handler.0.handle(message);
114    } else {
115        log::error!("send_response: handler not found for correlation_id '{correlation_id}'");
116    }
117}
118
119/// Publish [`Data`] to a topic.
120pub fn publish_data(topic: &Ustr, message: Data) {
121    let matching_subs = get_message_bus()
122        .borrow_mut()
123        .matching_subscriptions(*topic);
124
125    for sub in matching_subs {
126        sub.handler.0.handle(&message);
127    }
128}
129
130/// Sends the response to the handler registered for the `correlation_id` (if found).
131pub fn response(correlation_id: &UUID4, message: &dyn Any) {
132    let handler = get_message_bus()
133        .borrow()
134        .get_response_handler(correlation_id)
135        .cloned();
136    if let Some(handler) = handler {
137        handler.0.handle(message);
138    } else {
139        log::error!("response: handler not found for correlation_id '{correlation_id}'");
140    }
141}
142
143pub fn register_response_handler(correlation_id: &UUID4, handler: ShareableMessageHandler) {
144    if let Err(e) = get_message_bus()
145        .borrow_mut()
146        .register_response_handler(correlation_id, handler)
147    {
148        log::error!("Failed to register request handler: {e}");
149    }
150}
151
152/// Publishes the `message` to the `topic`.
153pub fn publish(topic: MStr<Topic>, message: &dyn Any) {
154    let matching_subs = get_message_bus()
155        .borrow_mut()
156        .inner_matching_subscriptions(topic);
157
158    for sub in matching_subs {
159        sub.handler.0.handle(message);
160    }
161}
162
163/// Registers the `handler` for the `endpoint` address.
164pub fn register(endpoint: MStr<Endpoint>, handler: ShareableMessageHandler) {
165    log::debug!(
166        "Registering endpoint '{endpoint}' with handler ID {}",
167        handler.0.id(),
168    );
169
170    // Updates value if key already exists
171    get_message_bus()
172        .borrow_mut()
173        .endpoints
174        .insert(endpoint, handler);
175}
176
177/// Deregisters the handler for the `endpoint` address.
178pub fn deregister(endpoint: MStr<Endpoint>) {
179    log::debug!("Deregistering endpoint '{endpoint}'");
180
181    // Removes entry if it exists for endpoint
182    get_message_bus()
183        .borrow_mut()
184        .endpoints
185        .shift_remove(&endpoint);
186}
187
188/// Subscribes the `handler` to the `pattern` with an optional `priority`.
189///
190/// # Warnings
191///
192/// Assigning priority handling is an advanced feature which *shouldn't
193/// normally be needed by most users*. **Only assign a higher priority to the
194/// subscription if you are certain of what you're doing**. If an inappropriate
195/// priority is assigned then the handler may receive messages before core
196/// system components have been able to process necessary calculations and
197/// produce potential side effects for logically sound behavior.
198pub fn subscribe(pattern: MStr<Pattern>, handler: ShareableMessageHandler, priority: Option<u8>) {
199    let msgbus = get_message_bus();
200    let mut msgbus_ref_mut = msgbus.borrow_mut();
201    let sub = Subscription::new(pattern, handler, priority);
202
203    log::debug!(
204        "Subscribing {:?} for pattern '{}'",
205        sub.handler,
206        sub.pattern
207    );
208
209    // Prevent duplicate subscriptions for the exact pattern regardless of handler identity. This
210    // guards against callers accidentally registering multiple handlers for the same topic, which
211    // can lead to duplicated message delivery and unexpected side-effects.
212    if msgbus_ref_mut.subscriptions.contains(&sub) {
213        log::warn!("{sub:?} already exists");
214        return;
215    }
216
217    // Find existing patterns which match this topic
218    for (topic, subs) in &mut msgbus_ref_mut.topics {
219        if is_matching_backtracking(*topic, sub.pattern) {
220            // TODO: Consider binary_search and then insert
221            subs.push(sub.clone());
222            subs.sort();
223            log::debug!("Added subscription for '{topic}'");
224        }
225    }
226
227    msgbus_ref_mut.subscriptions.insert(sub);
228}
229
230pub fn subscribe_topic(topic: MStr<Topic>, handler: ShareableMessageHandler, priority: Option<u8>) {
231    subscribe(topic.into(), handler, priority);
232}
233
234pub fn subscribe_str<T: AsRef<str>>(
235    pattern: T,
236    handler: ShareableMessageHandler,
237    priority: Option<u8>,
238) {
239    subscribe(MStr::from(pattern.as_ref()), handler, priority);
240}
241
242/// Unsubscribes the `handler` from the `pattern`.
243pub fn unsubscribe(pattern: MStr<Pattern>, handler: ShareableMessageHandler) {
244    log::debug!("Unsubscribing {handler:?} from pattern '{pattern}'");
245
246    let sub = core::Subscription::new(pattern, handler, None);
247
248    get_message_bus()
249        .borrow_mut()
250        .topics
251        .values_mut()
252        .for_each(|subs| {
253            if let Ok(index) = subs.binary_search(&sub) {
254                subs.remove(index);
255            }
256        });
257
258    let removed = get_message_bus().borrow_mut().subscriptions.remove(&sub);
259
260    if removed {
261        log::debug!("Handler for pattern '{pattern}' was removed");
262    } else {
263        log::debug!("No matching handler for pattern '{pattern}' was found");
264    }
265}
266
267pub fn unsubscribe_topic(topic: MStr<Topic>, handler: ShareableMessageHandler) {
268    unsubscribe(topic.into(), handler);
269}
270
271pub fn unsubscribe_str<T: AsRef<str>>(pattern: T, handler: ShareableMessageHandler) {
272    unsubscribe(MStr::from(pattern.as_ref()), handler);
273}
274
275pub fn is_subscribed<T: AsRef<str>>(pattern: T, handler: ShareableMessageHandler) -> bool {
276    let pattern = MStr::from(pattern.as_ref());
277    let sub = Subscription::new(pattern, handler, None);
278    get_message_bus().borrow().subscriptions.contains(&sub)
279}
280
281pub fn subscriptions_count<T: AsRef<str>>(topic: T) -> usize {
282    get_message_bus().borrow().subscriptions_count(topic)
283}