nautilus_common/msgbus/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! A common in-memory `MessageBus` for loosely coupled message passing patterns.
17
18pub mod database;
19pub mod handler;
20pub mod stubs;
21pub mod switchboard;
22
23use std::{
24    any::Any,
25    collections::HashMap,
26    fmt::Debug,
27    hash::{Hash, Hasher},
28};
29
30use handler::ShareableMessageHandler;
31use indexmap::IndexMap;
32use nautilus_core::UUID4;
33use nautilus_model::{data::Data, identifiers::TraderId};
34use switchboard::MessagingSwitchboard;
35use ustr::Ustr;
36
37use crate::messages::data::DataResponse;
38
39pub const CLOSE_TOPIC: &str = "CLOSE";
40
41/// Represents a subscription to a particular topic.
42///
43/// This is an internal class intended to be used by the message bus to organize
44/// topics and their subscribers.
45///
46/// # Warnings
47///
48/// Assigning priority handling is an advanced feature which *shouldn't
49/// normally be needed by most users*. **Only assign a higher priority to the
50/// subscription if you are certain of what you're doing**. If an inappropriate
51/// priority is assigned then the handler may receive messages before core
52/// system components have been able to process necessary calculations and
53/// produce potential side effects for logically sound behavior.
54#[derive(Clone)]
55pub struct Subscription {
56    /// The shareable message handler for the subscription.
57    pub handler: ShareableMessageHandler,
58    /// Store a copy of the handler ID for faster equality checks.
59    pub handler_id: Ustr,
60    /// The topic for the subscription.
61    pub topic: Ustr,
62    /// The priority for the subscription determines the ordering of handlers receiving
63    /// messages being processed, higher priority handlers will receive messages before
64    /// lower priority handlers.
65    pub priority: u8,
66}
67
68impl Subscription {
69    /// Creates a new [`Subscription`] instance.
70    #[must_use]
71    pub fn new<T: AsRef<str>>(
72        topic: T,
73        handler: ShareableMessageHandler,
74        priority: Option<u8>,
75    ) -> Self {
76        let handler_id = handler.0.id();
77
78        Self {
79            handler_id,
80            topic: Ustr::from(topic.as_ref()),
81            handler,
82            priority: priority.unwrap_or(0),
83        }
84    }
85}
86
87impl Debug for Subscription {
88    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89        write!(
90            f,
91            "Subscription {{ topic: {}, handler: {}, priority: {} }}",
92            self.topic, self.handler_id, self.priority
93        )
94    }
95}
96
97impl PartialEq<Self> for Subscription {
98    fn eq(&self, other: &Self) -> bool {
99        self.topic == other.topic && self.handler_id == other.handler_id
100    }
101}
102
103impl Eq for Subscription {}
104
105impl PartialOrd for Subscription {
106    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
107        Some(self.cmp(other))
108    }
109}
110
111impl Ord for Subscription {
112    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
113        other.priority.cmp(&self.priority)
114    }
115}
116
117impl Hash for Subscription {
118    fn hash<H: Hasher>(&self, state: &mut H) {
119        self.topic.hash(state);
120        self.handler_id.hash(state);
121    }
122}
123
124/// A generic message bus to facilitate various messaging patterns.
125///
126/// The bus provides both a producer and consumer API for Pub/Sub, Req/Rep, as
127/// well as direct point-to-point messaging to registered endpoints.
128///
129/// Pub/Sub wildcard patterns for hierarchical topics are possible:
130///  - `*` asterisk represents one or more characters in a pattern.
131///  - `?` question mark represents a single character in a pattern.
132///
133/// Given a topic and pattern potentially containing wildcard characters, i.e.
134/// `*` and `?`, where `?` can match any single character in the topic, and `*`
135/// can match any number of characters including zero characters.
136///
137/// The asterisk in a wildcard matches any character zero or more times. For
138/// example, `comp*` matches anything beginning with `comp` which means `comp`,
139/// `complete`, and `computer` are all matched.
140///
141/// A question mark matches a single character once. For example, `c?mp` matches
142/// `camp` and `comp`. The question mark can also be used more than once.
143/// For example, `c??p` would match both of the above examples and `coop`.
144#[cfg_attr(
145    feature = "python",
146    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
147)]
148pub struct MessageBus {
149    /// The trader ID associated with the message bus.
150    pub trader_id: TraderId,
151    /// The instance ID associated with the message bus.
152    pub instance_id: UUID4,
153    /// The name for the message bus.
154    pub name: String,
155    /// If the message bus is backed by a database.
156    pub has_backing: bool,
157    /// The switchboard for built-in endpoints.
158    pub switchboard: MessagingSwitchboard,
159    /// Mapping from topic to the corresponding handler
160    /// a topic can be a string with wildcards
161    /// * '?' - any character
162    /// * '*' - any number of any characters
163    subscriptions: IndexMap<Subscription, Vec<Ustr>>,
164    /// Maps a pattern to all the handlers registered for it
165    /// this is updated whenever a new subscription is created.
166    patterns: IndexMap<Ustr, Vec<Subscription>>,
167    /// Handles a message or a request destined for a specific endpoint.
168    endpoints: IndexMap<Ustr, ShareableMessageHandler>,
169}
170
171// SAFETY: Message bus is not meant to be passed between threads
172unsafe impl Send for MessageBus {}
173unsafe impl Sync for MessageBus {}
174
175impl MessageBus {
176    /// Creates a new [`MessageBus`] instance.
177    #[must_use]
178    pub fn new(
179        trader_id: TraderId,
180        instance_id: UUID4,
181        name: Option<String>,
182        _config: Option<HashMap<String, serde_json::Value>>,
183    ) -> Self {
184        Self {
185            trader_id,
186            instance_id,
187            name: name.unwrap_or(stringify!(MessageBus).to_owned()),
188            switchboard: MessagingSwitchboard::default(),
189            subscriptions: IndexMap::new(),
190            patterns: IndexMap::new(),
191            endpoints: IndexMap::new(),
192            has_backing: false,
193        }
194    }
195
196    /// Returns the message bus instances memory address.
197    #[must_use]
198    pub fn memory_address(&self) -> String {
199        format!("{:?}", std::ptr::from_ref(self))
200    }
201
202    /// Returns the registered endpoint addresses.
203    #[must_use]
204    pub fn endpoints(&self) -> Vec<&str> {
205        self.endpoints.keys().map(Ustr::as_str).collect()
206    }
207
208    /// Returns the topics for active subscriptions.
209    #[must_use]
210    pub fn topics(&self) -> Vec<&str> {
211        self.subscriptions
212            .keys()
213            .map(|s| s.topic.as_str())
214            .collect()
215    }
216
217    /// Returns whether there are subscribers for the given `pattern`.
218    #[must_use]
219    pub fn has_subscribers<T: AsRef<str>>(&self, pattern: T) -> bool {
220        self.matching_handlers(&Ustr::from(pattern.as_ref()))
221            .next()
222            .is_some()
223    }
224
225    /// Returns the count of subscribers for the given `pattern`.
226    #[must_use]
227    pub fn subscriptions_count<T: AsRef<str>>(&self, pattern: T) -> usize {
228        self.matching_subscriptions(&Ustr::from(pattern.as_ref()))
229            .len()
230    }
231
232    /// Returns whether there are subscribers for the given `pattern`.
233    #[must_use]
234    pub fn subscriptions(&self) -> Vec<&Subscription> {
235        self.subscriptions.keys().collect()
236    }
237
238    /// Returns whether there are subscribers for the given `pattern`.
239    #[must_use]
240    pub fn subscription_handler_ids(&self) -> Vec<&str> {
241        self.subscriptions
242            .keys()
243            .map(|s| s.handler_id.as_str())
244            .collect()
245    }
246
247    /// Returns whether there is a registered endpoint for the given `pattern`.
248    #[must_use]
249    pub fn is_registered<T: AsRef<str>>(&self, endpoint: T) -> bool {
250        self.endpoints.contains_key(&Ustr::from(endpoint.as_ref()))
251    }
252
253    /// Returns whether there are subscribers for the given `pattern`.
254    #[must_use]
255    pub fn is_subscribed<T: AsRef<str>>(&self, topic: T, handler: ShareableMessageHandler) -> bool {
256        let sub = Subscription::new(topic, handler, None);
257        self.subscriptions.contains_key(&sub)
258    }
259
260    /// Close the message bus which will close the sender channel and join the thread.
261    pub const fn close(&self) -> anyhow::Result<()> {
262        // TODO: Integrate the backing database
263        Ok(())
264    }
265
266    /// Registers the given `handler` for the `endpoint` address.
267    pub fn register<T: AsRef<str>>(&mut self, endpoint: T, handler: ShareableMessageHandler) {
268        log::debug!(
269            "Registering endpoint '{}' with handler ID {} at {}",
270            endpoint.as_ref(),
271            handler.0.id(),
272            self.memory_address(),
273        );
274        // Updates value if key already exists
275        self.endpoints
276            .insert(Ustr::from(endpoint.as_ref()), handler);
277    }
278
279    /// Deregisters the given `handler` for the `endpoint` address.
280    pub fn deregister(&mut self, endpoint: &Ustr) {
281        log::debug!(
282            "Deregistering endpoint '{endpoint}' at {}",
283            self.memory_address()
284        );
285        // Removes entry if it exists for endpoint
286        self.endpoints.shift_remove(endpoint);
287    }
288
289    /// Subscribes the given `handler` to the `topic`.
290    pub fn subscribe<T: AsRef<str>>(
291        &mut self,
292        topic: T,
293        handler: ShareableMessageHandler,
294        priority: Option<u8>,
295    ) {
296        log::debug!(
297            "Subscribing for topic '{}' at {}",
298            topic.as_ref(),
299            self.memory_address(),
300        );
301        let sub = Subscription::new(topic.as_ref(), handler, priority);
302        if self.subscriptions.contains_key(&sub) {
303            log::error!("{sub:?} already exists.");
304            return;
305        }
306
307        // Find existing patterns which match this topic
308        let mut matches = Vec::new();
309        for (pattern, subs) in &mut self.patterns {
310            if is_matching(&Ustr::from(topic.as_ref()), pattern) {
311                subs.push(sub.clone());
312                subs.sort();
313                // subs.sort_by(|a, b| a.priority.cmp(&b.priority).then_with(|| a.cmp(b)));
314                matches.push(*pattern);
315            }
316        }
317
318        matches.sort();
319
320        self.subscriptions.insert(sub, matches);
321    }
322
323    /// Unsubscribes the given `handler` from the `topic`.
324    pub fn unsubscribe<T: AsRef<str>>(&mut self, topic: T, handler: ShareableMessageHandler) {
325        log::debug!(
326            "Unsubscribing for topic '{}' at {}",
327            topic.as_ref(),
328            self.memory_address(),
329        );
330        let sub = Subscription::new(topic, handler, None);
331        self.subscriptions.shift_remove(&sub);
332    }
333
334    /// Returns the handler for the given `endpoint`.
335    #[must_use]
336    pub fn get_endpoint<T: AsRef<str>>(&self, endpoint: T) -> Option<&ShareableMessageHandler> {
337        self.endpoints.get(&Ustr::from(endpoint.as_ref()))
338    }
339
340    #[must_use]
341    pub fn matching_subscriptions<'a>(&'a self, pattern: &'a Ustr) -> Vec<&'a Subscription> {
342        let mut matching_subs: Vec<&'a Subscription> = Vec::new();
343
344        // Collect matching subscriptions from direct subscriptions
345        matching_subs.extend(self.subscriptions.iter().filter_map(|(sub, _)| {
346            if is_matching(&sub.topic, pattern) {
347                Some(sub)
348            } else {
349                None
350            }
351        }));
352
353        // Collect matching subscriptions from pattern-based subscriptions
354        // TODO: Improve efficiency of this
355        for subs in self.patterns.values() {
356            let filtered_subs: Vec<&Subscription> = subs
357                .iter()
358                // .filter(|sub| is_matching(&sub.topic, pattern))
359                // .filter(|sub| !matching_subs.contains(sub) && is_matching(&sub.topic, pattern))
360                .collect();
361
362            matching_subs.extend(filtered_subs);
363        }
364
365        // Sort into priority order
366        matching_subs.sort();
367        matching_subs
368    }
369
370    fn matching_handlers<'a>(
371        &'a self,
372        pattern: &'a Ustr,
373    ) -> impl Iterator<Item = &'a ShareableMessageHandler> {
374        self.subscriptions.iter().filter_map(move |(sub, _)| {
375            if is_matching(&sub.topic, pattern) {
376                Some(&sub.handler)
377            } else {
378                None
379            }
380        })
381    }
382
383    /// Sends a message to an endpoint.
384    pub fn send(&self, endpoint: &Ustr, message: &dyn Any) {
385        if let Some(handler) = self.get_endpoint(endpoint) {
386            handler.0.handle(message);
387        }
388    }
389
390    /// Publish a message to a topic.
391    pub fn publish(&self, topic: &Ustr, message: &dyn Any) {
392        log::trace!(
393            "Publishing topic '{topic}' {message:?} {}",
394            self.memory_address()
395        );
396        let matching_subs = self.matching_subscriptions(topic);
397
398        log::trace!("Matched {} subscriptions", matching_subs.len());
399
400        for sub in matching_subs {
401            log::trace!("Matched {sub:?}");
402            sub.handler.0.handle(message);
403        }
404    }
405}
406
407/// Data specific functions.
408impl MessageBus {
409    // /// Send a [`DataRequest`] to an endpoint that must be a data client implementation.
410    // pub fn send_data_request(&self, message: DataRequest) {
411    //     // TODO: log error
412    //     if let Some(client) = self.get_client(&message.client_id, message.venue) {
413    //         let _ = client.request(message);
414    //     }
415    // }
416    //
417    // /// Send a [`SubscriptionCommand`] to an endpoint that must be a data client implementation.
418    // pub fn send_subscription_command(&self, message: SubscriptionCommand) {
419    //     if let Some(client) = self.get_client(&message.client_id, message.venue) {
420    //         client.through_execute(message);
421    //     }
422    // }
423
424    /// Send a [`DataResponse`] to an endpoint that must be an actor.
425    pub fn send_response(&self, message: DataResponse) {
426        if let Some(handler) = self.get_endpoint(message.client_id.inner()) {
427            handler.0.handle_response(message);
428        }
429    }
430
431    /// Publish [`Data`] to a topic.
432    pub fn publish_data(&self, topic: &Ustr, message: Data) {
433        let matching_subs = self.matching_subscriptions(topic);
434
435        for sub in matching_subs {
436            sub.handler.0.handle_data(message.clone());
437        }
438    }
439}
440
441/// Match a topic and a string pattern
442/// pattern can contains -
443/// '*' - match 0 or more characters after this
444/// '?' - match any character once
445/// 'a-z' - match the specific character
446#[must_use]
447pub fn is_matching(topic: &Ustr, pattern: &Ustr) -> bool {
448    let mut table = [[false; 256]; 256];
449    table[0][0] = true;
450
451    let m = pattern.len();
452    let n = topic.len();
453
454    pattern.chars().enumerate().for_each(|(j, c)| {
455        if c == '*' {
456            table[0][j + 1] = table[0][j];
457        }
458    });
459
460    topic.chars().enumerate().for_each(|(i, tc)| {
461        pattern.chars().enumerate().for_each(|(j, pc)| {
462            if pc == '*' {
463                table[i + 1][j + 1] = table[i][j + 1] || table[i + 1][j];
464            } else if pc == '?' || tc == pc {
465                table[i + 1][j + 1] = table[i][j];
466            }
467        });
468    });
469
470    table[n][m]
471}
472
473impl Default for MessageBus {
474    /// Creates a new default [`MessageBus`] instance.
475    fn default() -> Self {
476        Self::new(TraderId::from("TRADER-001"), UUID4::new(), None, None)
477    }
478}
479
480////////////////////////////////////////////////////////////////////////////////
481// Tests
482////////////////////////////////////////////////////////////////////////////////
483#[cfg(test)]
484mod tests {
485
486    use nautilus_core::UUID4;
487    use rstest::*;
488    use stubs::check_handler_was_called;
489
490    use super::*;
491    use crate::msgbus::stubs::{get_call_check_shareable_handler, get_stub_shareable_handler};
492
493    fn stub_msgbus() -> MessageBus {
494        MessageBus::new(TraderId::from("trader-001"), UUID4::new(), None, None)
495    }
496
497    #[rstest]
498    fn test_new() {
499        let trader_id = TraderId::from("trader-001");
500        let msgbus = MessageBus::new(trader_id, UUID4::new(), None, None);
501
502        assert_eq!(msgbus.trader_id, trader_id);
503        assert_eq!(msgbus.name, stringify!(MessageBus));
504    }
505
506    #[rstest]
507    fn test_endpoints_when_no_endpoints() {
508        let msgbus = stub_msgbus();
509
510        assert!(msgbus.endpoints().is_empty());
511    }
512
513    #[rstest]
514    fn test_topics_when_no_subscriptions() {
515        let msgbus = stub_msgbus();
516
517        assert!(msgbus.topics().is_empty());
518        assert!(!msgbus.has_subscribers("my-topic"));
519    }
520
521    #[rstest]
522    fn test_is_subscribed_when_no_subscriptions() {
523        let msgbus = stub_msgbus();
524        let handler = get_stub_shareable_handler(None);
525
526        assert!(!msgbus.is_subscribed("my-topic", handler));
527    }
528
529    #[rstest]
530    fn test_is_registered_when_no_registrations() {
531        let msgbus = stub_msgbus();
532
533        assert!(!msgbus.is_registered("MyEndpoint"));
534    }
535
536    #[rstest]
537    fn test_regsiter_endpoint() {
538        let mut msgbus = stub_msgbus();
539        let endpoint = "MyEndpoint";
540        let handler = get_stub_shareable_handler(None);
541
542        msgbus.register(endpoint, handler);
543
544        assert_eq!(msgbus.endpoints(), vec![endpoint.to_string()]);
545        assert!(msgbus.get_endpoint(endpoint).is_some());
546    }
547
548    #[rstest]
549    fn test_endpoint_send() {
550        let mut msgbus = stub_msgbus();
551        let endpoint = Ustr::from("MyEndpoint");
552        let handler = get_call_check_shareable_handler(None);
553
554        msgbus.register(endpoint, handler.clone());
555        assert!(msgbus.get_endpoint(endpoint).is_some());
556        assert!(!check_handler_was_called(handler.clone()));
557
558        // Send a message to the endpoint
559        msgbus.send(&endpoint, &"Test Message");
560        assert!(check_handler_was_called(handler));
561    }
562
563    #[rstest]
564    fn test_deregsiter_endpoint() {
565        let mut msgbus = stub_msgbus();
566        let endpoint = Ustr::from("MyEndpoint");
567        let handler = get_stub_shareable_handler(None);
568
569        msgbus.register(endpoint, handler);
570        msgbus.deregister(&endpoint);
571
572        assert!(msgbus.endpoints().is_empty());
573    }
574
575    #[rstest]
576    fn test_subscribe() {
577        let mut msgbus = stub_msgbus();
578        let topic = "my-topic";
579        let handler = get_stub_shareable_handler(None);
580
581        msgbus.subscribe(topic, handler, Some(1));
582
583        assert!(msgbus.has_subscribers(topic));
584        assert_eq!(msgbus.topics(), vec![topic]);
585    }
586
587    #[rstest]
588    fn test_unsubscribe() {
589        let mut msgbus = stub_msgbus();
590        let topic = "my-topic";
591        let handler = get_stub_shareable_handler(None);
592
593        msgbus.subscribe(topic, handler.clone(), None);
594        msgbus.unsubscribe(topic, handler);
595
596        assert!(!msgbus.has_subscribers(topic));
597        assert!(msgbus.topics().is_empty());
598    }
599
600    #[rstest]
601    fn test_matching_subscriptions() {
602        let mut msgbus = stub_msgbus();
603        let topic = "my-topic";
604
605        let handler_id1 = Ustr::from("1");
606        let handler1 = get_stub_shareable_handler(Some(handler_id1));
607
608        let handler_id2 = Ustr::from("2");
609        let handler2 = get_stub_shareable_handler(Some(handler_id2));
610
611        let handler_id3 = Ustr::from("3");
612        let handler3 = get_stub_shareable_handler(Some(handler_id3));
613
614        let handler_id4 = Ustr::from("4");
615        let handler4 = get_stub_shareable_handler(Some(handler_id4));
616
617        msgbus.subscribe(topic, handler1, None);
618        msgbus.subscribe(topic, handler2, None);
619        msgbus.subscribe(topic, handler3, Some(1));
620        msgbus.subscribe(topic, handler4, Some(2));
621        let topic = Ustr::from(topic);
622        let subs = msgbus.matching_subscriptions(&topic);
623
624        assert_eq!(subs.len(), 4);
625        assert_eq!(subs[0].handler_id, handler_id4);
626        assert_eq!(subs[1].handler_id, handler_id3);
627        assert_eq!(subs[2].handler_id, handler_id1);
628        assert_eq!(subs[3].handler_id, handler_id2);
629    }
630
631    #[rstest]
632    #[case("*", "*", true)]
633    #[case("a", "*", true)]
634    #[case("a", "a", true)]
635    #[case("a", "b", false)]
636    #[case("data.quotes.BINANCE", "data.*", true)]
637    #[case("data.quotes.BINANCE", "data.quotes*", true)]
638    #[case("data.quotes.BINANCE", "data.*.BINANCE", true)]
639    #[case("data.trades.BINANCE.ETHUSDT", "data.*.BINANCE.*", true)]
640    #[case("data.trades.BINANCE.ETHUSDT", "data.*.BINANCE.ETH*", true)]
641    #[case("data.trades.BINANCE.ETHUSDT", "data.*.BINANCE.ETH???", false)]
642    #[case("data.trades.BINANCE.ETHUSD", "data.*.BINANCE.ETH???", true)]
643    // We don't support [seq] style pattern
644    #[case("data.trades.BINANCE.ETHUSDT", "data.*.BINANCE.ET[HC]USDT", false)]
645    // We don't support [!seq] style pattern
646    #[case("data.trades.BINANCE.ETHUSDT", "data.*.BINANCE.ET[!ABC]USDT", false)]
647    // We don't support [^seq] style pattern
648    #[case("data.trades.BINANCE.ETHUSDT", "data.*.BINANCE.ET[^ABC]USDT", false)]
649    fn test_is_matching(#[case] topic: &str, #[case] pattern: &str, #[case] expected: bool) {
650        assert_eq!(
651            is_matching(&Ustr::from(topic), &Ustr::from(pattern)),
652            expected
653        );
654    }
655}