nautilus_common/msgbus/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! A common in-memory `MessageBus` supporting multiple messaging patterns:
17//!
18//! - Point-to-Point
19//! - Pub/Sub
20//! - Request/Response
21
22pub mod core;
23pub mod database;
24pub mod handler;
25pub mod matching;
26pub mod message;
27pub mod stubs;
28pub mod switchboard;
29
30#[cfg(test)]
31mod tests;
32
33pub use core::{Endpoint, MStr, MessageBus, Pattern, Subscription, Topic};
34use std::{
35    self,
36    any::Any,
37    cell::{OnceCell, RefCell},
38    rc::Rc,
39};
40
41use handler::ShareableMessageHandler;
42use matching::is_matching_backtracking;
43use nautilus_core::UUID4;
44use nautilus_model::data::Data;
45use ustr::Ustr;
46
47use crate::messages::data::DataResponse;
48pub use crate::msgbus::message::BusMessage;
49
50// Thread-local storage for MessageBus instances. Each thread (including async runtimes)
51// gets its own MessageBus instance, eliminating the need for unsafe Send/Sync implementations
52// while maintaining the global singleton access pattern that the framework expects.
53thread_local! {
54    static MESSAGE_BUS: OnceCell<Rc<RefCell<MessageBus>>> = const { OnceCell::new() };
55}
56
57/// Sets the thread-local message bus.
58///
59/// # Panics
60///
61/// Panics if a message bus has already been set for this thread.
62pub fn set_message_bus(msgbus: Rc<RefCell<MessageBus>>) {
63    MESSAGE_BUS.with(|bus| {
64        assert!(
65            bus.set(msgbus).is_ok(),
66            "Failed to set MessageBus: already initialized for this thread"
67        );
68    });
69}
70
71/// Gets the thread-local message bus.
72///
73/// If no message bus has been set for this thread, a default one is created and initialized.
74/// This ensures each thread gets its own MessageBus instance, preventing data races while
75/// maintaining the singleton pattern that the codebase expects.
76pub fn get_message_bus() -> Rc<RefCell<MessageBus>> {
77    MESSAGE_BUS.with(|bus| {
78        bus.get_or_init(|| {
79            let msgbus = MessageBus::default();
80            Rc::new(RefCell::new(msgbus))
81        })
82        .clone()
83    })
84}
85
86/// Sends the `message` to the `endpoint`.
87pub fn send_any(endpoint: MStr<Endpoint>, message: &dyn Any) {
88    let handler = get_message_bus().borrow().get_endpoint(endpoint).cloned();
89    if let Some(handler) = handler {
90        handler.0.handle(message);
91    } else {
92        log::error!("send_any: no registered endpoint '{endpoint}'");
93    }
94}
95
96/// Sends the `message` to the `endpoint`.
97pub fn send<T: 'static>(endpoint: MStr<Endpoint>, message: T) {
98    let handler = get_message_bus().borrow().get_endpoint(endpoint).cloned();
99    if let Some(handler) = handler {
100        handler.0.handle(&message);
101    } else {
102        log::error!("send: no registered endpoint '{endpoint}'");
103    }
104}
105
106/// Sends the [`DataResponse`] to the registered correlation ID handler.
107pub fn send_response(correlation_id: &UUID4, message: &DataResponse) {
108    let handler = get_message_bus()
109        .borrow()
110        .get_response_handler(correlation_id)
111        .cloned();
112
113    if let Some(handler) = handler {
114        match message {
115            DataResponse::Data(resp) => handler.0.handle(resp),
116            DataResponse::Instrument(resp) => handler.0.handle(resp.as_ref()),
117            DataResponse::Instruments(resp) => handler.0.handle(resp),
118            DataResponse::Book(resp) => handler.0.handle(resp),
119            DataResponse::Quotes(resp) => handler.0.handle(resp),
120            DataResponse::Trades(resp) => handler.0.handle(resp),
121            DataResponse::Bars(resp) => handler.0.handle(resp),
122        }
123    } else {
124        log::error!("send_response: handler not found for correlation_id '{correlation_id}'");
125    }
126}
127
128/// Publish [`Data`] to a topic.
129pub fn publish_data(topic: &Ustr, message: Data) {
130    let matching_subs = get_message_bus()
131        .borrow_mut()
132        .matching_subscriptions(*topic);
133
134    for sub in matching_subs {
135        sub.handler.0.handle(&message);
136    }
137}
138
139pub fn register_response_handler(correlation_id: &UUID4, handler: ShareableMessageHandler) {
140    if let Err(e) = get_message_bus()
141        .borrow_mut()
142        .register_response_handler(correlation_id, handler)
143    {
144        log::error!("Failed to register request handler: {e}");
145    }
146}
147
148/// Publishes the `message` to the `topic`.
149pub fn publish(topic: MStr<Topic>, message: &dyn Any) {
150    let matching_subs = get_message_bus()
151        .borrow_mut()
152        .inner_matching_subscriptions(topic);
153
154    for sub in matching_subs {
155        sub.handler.0.handle(message);
156    }
157}
158
159/// Registers the `handler` for the `endpoint` address.
160pub fn register(endpoint: MStr<Endpoint>, handler: ShareableMessageHandler) {
161    log::debug!(
162        "Registering endpoint '{endpoint}' with handler ID {}",
163        handler.0.id(),
164    );
165
166    // Updates value if key already exists
167    get_message_bus()
168        .borrow_mut()
169        .endpoints
170        .insert(endpoint, handler);
171}
172
173/// Deregisters the handler for the `endpoint` address.
174pub fn deregister(endpoint: MStr<Endpoint>) {
175    log::debug!("Deregistering endpoint '{endpoint}'");
176
177    // Removes entry if it exists for endpoint
178    get_message_bus()
179        .borrow_mut()
180        .endpoints
181        .shift_remove(&endpoint);
182}
183
184/// Subscribes the `handler` to the `pattern` with an optional `priority`.
185///
186/// # Warnings
187///
188/// Assigning priority handling is an advanced feature which *shouldn't
189/// normally be needed by most users*. **Only assign a higher priority to the
190/// subscription if you are certain of what you're doing**. If an inappropriate
191/// priority is assigned then the handler may receive messages before core
192/// system components have been able to process necessary calculations and
193/// produce potential side effects for logically sound behavior.
194pub fn subscribe(pattern: MStr<Pattern>, handler: ShareableMessageHandler, priority: Option<u8>) {
195    let msgbus = get_message_bus();
196    let mut msgbus_ref_mut = msgbus.borrow_mut();
197    let sub = Subscription::new(pattern, handler, priority);
198
199    log::debug!(
200        "Subscribing {:?} for pattern '{}'",
201        sub.handler,
202        sub.pattern
203    );
204
205    // Prevent duplicate subscriptions for the exact pattern regardless of handler identity. This
206    // guards against callers accidentally registering multiple handlers for the same topic, which
207    // can lead to duplicated message delivery and unexpected side-effects.
208    if msgbus_ref_mut.subscriptions.contains(&sub) {
209        log::warn!("{sub:?} already exists");
210        return;
211    }
212
213    // Find existing patterns which match this topic
214    for (topic, subs) in &mut msgbus_ref_mut.topics {
215        if is_matching_backtracking(*topic, sub.pattern) {
216            // TODO: Consider binary_search and then insert
217            subs.push(sub.clone());
218            subs.sort();
219            log::debug!("Added subscription for '{topic}'");
220        }
221    }
222
223    msgbus_ref_mut.subscriptions.insert(sub);
224}
225
226pub fn subscribe_topic(topic: MStr<Topic>, handler: ShareableMessageHandler, priority: Option<u8>) {
227    subscribe(topic.into(), handler, priority);
228}
229
230pub fn subscribe_str<T: AsRef<str>>(
231    pattern: T,
232    handler: ShareableMessageHandler,
233    priority: Option<u8>,
234) {
235    subscribe(MStr::from(pattern.as_ref()), handler, priority);
236}
237
238/// Unsubscribes the `handler` from the `pattern`.
239pub fn unsubscribe(pattern: MStr<Pattern>, handler: ShareableMessageHandler) {
240    log::debug!("Unsubscribing {handler:?} from pattern '{pattern}'");
241
242    let sub = core::Subscription::new(pattern, handler, None);
243
244    get_message_bus()
245        .borrow_mut()
246        .topics
247        .values_mut()
248        .for_each(|subs| {
249            if let Ok(index) = subs.binary_search(&sub) {
250                subs.remove(index);
251            }
252        });
253
254    let removed = get_message_bus().borrow_mut().subscriptions.remove(&sub);
255
256    if removed {
257        log::debug!("Handler for pattern '{pattern}' was removed");
258    } else {
259        log::debug!("No matching handler for pattern '{pattern}' was found");
260    }
261}
262
263pub fn unsubscribe_topic(topic: MStr<Topic>, handler: ShareableMessageHandler) {
264    unsubscribe(topic.into(), handler);
265}
266
267pub fn unsubscribe_str<T: AsRef<str>>(pattern: T, handler: ShareableMessageHandler) {
268    unsubscribe(MStr::from(pattern.as_ref()), handler);
269}
270
271pub fn is_subscribed<T: AsRef<str>>(pattern: T, handler: ShareableMessageHandler) -> bool {
272    let pattern = MStr::from(pattern.as_ref());
273    let sub = Subscription::new(pattern, handler, None);
274    get_message_bus().borrow().subscriptions.contains(&sub)
275}
276
277pub fn subscriptions_count<T: AsRef<str>>(topic: T) -> usize {
278    get_message_bus().borrow().subscriptions_count(topic)
279}