1pub mod database;
19pub mod handler;
20pub mod stubs;
21pub mod switchboard;
22
23use std::{
24 any::Any,
25 cell::RefCell,
26 collections::HashMap,
27 fmt::Debug,
28 hash::{Hash, Hasher},
29 rc::Rc,
30 sync::OnceLock,
31};
32
33use handler::ShareableMessageHandler;
34use indexmap::IndexMap;
35use nautilus_core::UUID4;
36use nautilus_model::{data::Data, identifiers::TraderId};
37use switchboard::MessagingSwitchboard;
38use ustr::Ustr;
39
40use crate::messages::data::DataResponse;
41
42pub const CLOSE_TOPIC: &str = "CLOSE";
43
44pub struct MessageBusWrapper(Rc<RefCell<MessageBus>>);
45
46unsafe impl Send for MessageBusWrapper {}
47unsafe impl Sync for MessageBusWrapper {}
48
49static MESSAGE_BUS: OnceLock<MessageBusWrapper> = OnceLock::new();
50
51pub fn set_message_bus(msgbus: Rc<RefCell<MessageBus>>) {
52 if MESSAGE_BUS.set(MessageBusWrapper(msgbus)).is_err() {
53 panic!("Failed to set MessageBus");
54 }
55}
56
57pub fn get_message_bus() -> Rc<RefCell<MessageBus>> {
58 if MESSAGE_BUS.get().is_none() {
59 let msgbus = MessageBus::default();
61 let msgbus = Rc::new(RefCell::new(msgbus));
62 let _ = MESSAGE_BUS.set(MessageBusWrapper(msgbus.clone()));
63 msgbus
64 } else {
65 MESSAGE_BUS.get().unwrap().0.clone()
66 }
67}
68
69pub fn send(endpoint: &Ustr, message: &dyn Any) {
70 let handler = get_message_bus().borrow().get_endpoint(endpoint).cloned();
71 if let Some(handler) = handler {
72 handler.0.handle(message);
73 }
74}
75
76pub fn publish(topic: &Ustr, message: &dyn Any) {
78 log::trace!(
79 "Publishing topic '{topic}' {message:?} at {}",
80 get_message_bus().borrow().memory_address()
81 );
82 let matching_subs = get_message_bus().borrow().matching_subscriptions(topic);
83
84 log::trace!("Matched {} subscriptions", matching_subs.len());
85
86 for sub in matching_subs {
87 log::trace!("Matched {sub:?}");
88 sub.handler.0.handle(message);
89 }
90}
91
92pub fn register<T: AsRef<str>>(endpoint: T, handler: ShareableMessageHandler) {
94 log::debug!(
95 "Registering endpoint '{}' with handler ID {} at {}",
96 endpoint.as_ref(),
97 handler.0.id(),
98 get_message_bus().borrow().memory_address(),
99 );
100
101 get_message_bus()
103 .borrow_mut()
104 .endpoints
105 .insert(Ustr::from(endpoint.as_ref()), handler);
106}
107
108pub fn deregister(endpoint: &Ustr) {
110 log::debug!(
111 "Deregistering endpoint '{endpoint}' at {}",
112 get_message_bus().borrow().memory_address()
113 );
114 get_message_bus()
116 .borrow_mut()
117 .endpoints
118 .shift_remove(endpoint);
119}
120
121pub fn subscribe<T: AsRef<str>>(topic: T, handler: ShareableMessageHandler, priority: Option<u8>) {
123 log::debug!(
124 "Subscribing for topic '{}' at {}",
125 topic.as_ref(),
126 get_message_bus().borrow().memory_address(),
127 );
128
129 let msgbus = get_message_bus();
130 let mut msgbus_ref_mut = msgbus.borrow_mut();
131
132 let sub = Subscription::new(topic.as_ref(), handler, priority);
133 if msgbus_ref_mut.subscriptions.contains_key(&sub) {
134 log::error!("{sub:?} already exists");
135 return;
136 }
137
138 let mut matches = Vec::new();
140 for (pattern, subs) in msgbus_ref_mut.patterns.iter_mut() {
141 if is_matching(&Ustr::from(topic.as_ref()), pattern) {
142 subs.push(sub.clone());
143 subs.sort();
144 matches.push(*pattern);
146 }
147 }
148
149 matches.sort();
150
151 msgbus_ref_mut.subscriptions.insert(sub, matches);
152}
153
154pub fn unsubscribe<T: AsRef<str>>(topic: T, handler: ShareableMessageHandler) {
156 log::debug!(
157 "Unsubscribing for topic '{}' at {}",
158 topic.as_ref(),
159 get_message_bus().borrow().memory_address(),
160 );
161 let sub = Subscription::new(topic, handler, None);
162 get_message_bus()
163 .borrow_mut()
164 .subscriptions
165 .shift_remove(&sub);
166}
167
168pub fn is_subscribed<T: AsRef<str>>(topic: T, handler: ShareableMessageHandler) -> bool {
169 let sub = Subscription::new(topic, handler, None);
170 get_message_bus().borrow().subscriptions.contains_key(&sub)
171}
172
173pub fn subscriptions_count<T: AsRef<str>>(topic: T) -> usize {
174 get_message_bus().borrow().subscriptions_count(topic)
175}
176
177#[derive(Clone)]
191pub struct Subscription {
192 pub handler: ShareableMessageHandler,
194 pub handler_id: Ustr,
196 pub topic: Ustr,
198 pub priority: u8,
202}
203
204impl Subscription {
205 #[must_use]
207 pub fn new<T: AsRef<str>>(
208 topic: T,
209 handler: ShareableMessageHandler,
210 priority: Option<u8>,
211 ) -> Self {
212 let handler_id = handler.0.id();
213
214 Self {
215 handler_id,
216 topic: Ustr::from(topic.as_ref()),
217 handler,
218 priority: priority.unwrap_or(0),
219 }
220 }
221}
222
223impl Debug for Subscription {
224 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
225 write!(
226 f,
227 "Subscription {{ topic: {}, handler: {}, priority: {} }}",
228 self.topic, self.handler_id, self.priority
229 )
230 }
231}
232
233impl PartialEq<Self> for Subscription {
234 fn eq(&self, other: &Self) -> bool {
235 self.topic == other.topic && self.handler_id == other.handler_id
236 }
237}
238
239impl Eq for Subscription {}
240
241impl PartialOrd for Subscription {
242 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
243 Some(self.cmp(other))
244 }
245}
246
247impl Ord for Subscription {
248 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
249 other.priority.cmp(&self.priority)
250 }
251}
252
253impl Hash for Subscription {
254 fn hash<H: Hasher>(&self, state: &mut H) {
255 self.topic.hash(state);
256 self.handler_id.hash(state);
257 }
258}
259
260#[cfg_attr(
281 feature = "python",
282 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
283)]
284pub struct MessageBus {
285 pub trader_id: TraderId,
287 pub instance_id: UUID4,
289 pub name: String,
291 pub has_backing: bool,
293 pub switchboard: MessagingSwitchboard,
295 subscriptions: IndexMap<Subscription, Vec<Ustr>>,
300 patterns: IndexMap<Ustr, Vec<Subscription>>,
303 endpoints: IndexMap<Ustr, ShareableMessageHandler>,
305}
306
307unsafe impl Send for MessageBus {}
309unsafe impl Sync for MessageBus {}
310
311impl MessageBus {
312 #[must_use]
314 pub fn new(
315 trader_id: TraderId,
316 instance_id: UUID4,
317 name: Option<String>,
318 _config: Option<HashMap<String, serde_json::Value>>,
319 ) -> Self {
320 Self {
321 trader_id,
322 instance_id,
323 name: name.unwrap_or(stringify!(MessageBus).to_owned()),
324 switchboard: MessagingSwitchboard::default(),
325 subscriptions: IndexMap::new(),
326 patterns: IndexMap::new(),
327 endpoints: IndexMap::new(),
328 has_backing: false,
329 }
330 }
331
332 #[must_use]
334 pub fn memory_address(&self) -> String {
335 format!("{:?}", std::ptr::from_ref(self))
336 }
337
338 #[must_use]
340 pub fn endpoints(&self) -> Vec<&str> {
341 self.endpoints.keys().map(Ustr::as_str).collect()
342 }
343
344 #[must_use]
346 pub fn topics(&self) -> Vec<&str> {
347 self.subscriptions
348 .keys()
349 .map(|s| s.topic.as_str())
350 .collect()
351 }
352
353 #[must_use]
355 pub fn has_subscribers<T: AsRef<str>>(&self, pattern: T) -> bool {
356 self.matching_handlers(&Ustr::from(pattern.as_ref()))
357 .next()
358 .is_some()
359 }
360
361 #[must_use]
363 pub fn subscriptions_count<T: AsRef<str>>(&self, pattern: T) -> usize {
364 self.matching_subscriptions(&Ustr::from(pattern.as_ref()))
365 .len()
366 }
367
368 #[must_use]
370 pub fn subscriptions(&self) -> Vec<&Subscription> {
371 self.subscriptions.keys().collect()
372 }
373
374 #[must_use]
376 pub fn subscription_handler_ids(&self) -> Vec<&str> {
377 self.subscriptions
378 .keys()
379 .map(|s| s.handler_id.as_str())
380 .collect()
381 }
382
383 #[must_use]
385 pub fn is_registered<T: AsRef<str>>(&self, endpoint: T) -> bool {
386 self.endpoints.contains_key(&Ustr::from(endpoint.as_ref()))
387 }
388
389 #[must_use]
391 pub fn is_subscribed<T: AsRef<str>>(&self, topic: T, handler: ShareableMessageHandler) -> bool {
392 let sub = Subscription::new(topic, handler, None);
393 self.subscriptions.contains_key(&sub)
394 }
395
396 pub const fn close(&self) -> anyhow::Result<()> {
398 Ok(())
400 }
401 #[must_use]
403 pub fn get_endpoint<T: AsRef<str>>(&self, endpoint: T) -> Option<&ShareableMessageHandler> {
404 self.endpoints.get(&Ustr::from(endpoint.as_ref()))
405 }
406
407 #[must_use]
408 pub fn matching_subscriptions(&self, pattern: &Ustr) -> Vec<Subscription> {
409 let mut matching_subs: Vec<Subscription> = Vec::new();
410
411 matching_subs.extend(self.subscriptions.iter().filter_map(|(sub, _)| {
413 if is_matching(&sub.topic, pattern) {
414 Some(sub.clone())
415 } else {
416 None
417 }
418 }));
419
420 for subs in self.patterns.values() {
423 let filtered_subs: Vec<Subscription> = subs.to_vec();
424
425 matching_subs.extend(filtered_subs);
426 }
427
428 matching_subs.sort();
430 matching_subs
431 }
432
433 fn matching_handlers<'a>(
434 &'a self,
435 pattern: &'a Ustr,
436 ) -> impl Iterator<Item = &'a ShareableMessageHandler> {
437 self.subscriptions.iter().filter_map(move |(sub, _)| {
438 if is_matching(&sub.topic, pattern) {
439 Some(&sub.handler)
440 } else {
441 None
442 }
443 })
444 }
445}
446
447impl MessageBus {
449 pub fn send_response(&self, message: DataResponse) {
466 if let Some(handler) = self.get_endpoint(message.client_id.inner()) {
467 handler.0.handle(&message);
468 }
469 }
470
471 pub fn publish_data(&self, topic: &Ustr, message: Data) {
473 let matching_subs = self.matching_subscriptions(topic);
474
475 for sub in matching_subs {
476 sub.handler.0.handle(&message);
477 }
478 }
479
480 pub fn register_message_bus(self) -> Rc<RefCell<MessageBus>> {
482 let msgbus = Rc::new(RefCell::new(self));
483 set_message_bus(msgbus.clone());
484 msgbus
485 }
486}
487
488#[must_use]
494pub fn is_matching(topic: &Ustr, pattern: &Ustr) -> bool {
495 let mut table = [[false; 256]; 256];
496 table[0][0] = true;
497
498 let m = pattern.len();
499 let n = topic.len();
500
501 pattern.chars().enumerate().for_each(|(j, c)| {
502 if c == '*' {
503 table[0][j + 1] = table[0][j];
504 }
505 });
506
507 topic.chars().enumerate().for_each(|(i, tc)| {
508 pattern.chars().enumerate().for_each(|(j, pc)| {
509 if pc == '*' {
510 table[i + 1][j + 1] = table[i][j + 1] || table[i + 1][j];
511 } else if pc == '?' || tc == pc {
512 table[i + 1][j + 1] = table[i][j];
513 }
514 });
515 });
516
517 table[n][m]
518}
519
520impl Default for MessageBus {
521 fn default() -> Self {
523 Self::new(TraderId::from("TRADER-001"), UUID4::new(), None, None)
524 }
525}
526
527#[cfg(test)]
531pub(crate) mod tests {
532
533 use nautilus_core::UUID4;
534 use rstest::*;
535 use stubs::check_handler_was_called;
536
537 use super::*;
538 use crate::msgbus::stubs::{get_call_check_shareable_handler, get_stub_shareable_handler};
539
540 #[rstest]
541 fn test_new() {
542 let trader_id = TraderId::from("trader-001");
543 let msgbus = MessageBus::new(trader_id, UUID4::new(), None, None);
544
545 assert_eq!(msgbus.trader_id, trader_id);
546 assert_eq!(msgbus.name, stringify!(MessageBus));
547 }
548
549 #[rstest]
550 fn test_endpoints_when_no_endpoints() {
551 let msgbus = get_message_bus();
552 assert!(msgbus.borrow().endpoints().is_empty());
553 }
554
555 #[rstest]
556 fn test_topics_when_no_subscriptions() {
557 let msgbus = get_message_bus();
558 assert!(msgbus.borrow().topics().is_empty());
559 assert!(!msgbus.borrow().has_subscribers("my-topic"));
560 }
561
562 #[rstest]
563 fn test_is_subscribed_when_no_subscriptions() {
564 let msgbus = get_message_bus();
565 let handler = get_stub_shareable_handler(None);
566
567 assert!(!msgbus.borrow().is_subscribed("my-topic", handler));
568 }
569
570 #[rstest]
571 fn test_is_registered_when_no_registrations() {
572 let msgbus = get_message_bus();
573 assert!(!msgbus.borrow().is_registered("MyEndpoint"));
574 }
575
576 #[rstest]
577 fn test_regsiter_endpoint() {
578 let msgbus = get_message_bus();
579 let endpoint = "MyEndpoint";
580 let handler = get_stub_shareable_handler(None);
581
582 register(endpoint, handler);
583
584 assert_eq!(msgbus.borrow().endpoints(), vec![endpoint.to_string()]);
585 assert!(msgbus.borrow().get_endpoint(endpoint).is_some());
586 }
587
588 #[rstest]
589 fn test_endpoint_send() {
590 let msgbus = get_message_bus();
591 let endpoint = Ustr::from("MyEndpoint");
592 let handler = get_call_check_shareable_handler(None);
593
594 register(endpoint, handler.clone());
595 assert!(msgbus.borrow().get_endpoint(endpoint).is_some());
596 assert!(!check_handler_was_called(handler.clone()));
597
598 send(&endpoint, &"Test Message");
600 assert!(check_handler_was_called(handler));
601 }
602
603 #[rstest]
604 fn test_deregsiter_endpoint() {
605 let msgbus = get_message_bus();
606 let endpoint = Ustr::from("MyEndpoint");
607 let handler = get_stub_shareable_handler(None);
608
609 register(endpoint, handler);
610 deregister(&endpoint);
611
612 assert!(msgbus.borrow().endpoints().is_empty());
613 }
614
615 #[rstest]
616 fn test_subscribe() {
617 let msgbus = get_message_bus();
618 let topic = "my-topic";
619 let handler = get_stub_shareable_handler(None);
620
621 subscribe(topic, handler, Some(1));
622
623 assert!(msgbus.borrow().has_subscribers(topic));
624 assert_eq!(msgbus.borrow().topics(), vec![topic]);
625 }
626
627 #[rstest]
628 fn test_unsubscribe() {
629 let msgbus = get_message_bus();
630 let topic = "my-topic";
631 let handler = get_stub_shareable_handler(None);
632
633 subscribe(topic, handler.clone(), None);
634 unsubscribe(topic, handler);
635
636 assert!(!msgbus.borrow().has_subscribers(topic));
637 assert!(msgbus.borrow().topics().is_empty());
638 }
639
640 #[rstest]
641 fn test_matching_subscriptions() {
642 let msgbus = get_message_bus();
643 let topic = "my-topic";
644
645 let handler_id1 = Ustr::from("1");
646 let handler1 = get_stub_shareable_handler(Some(handler_id1));
647
648 let handler_id2 = Ustr::from("2");
649 let handler2 = get_stub_shareable_handler(Some(handler_id2));
650
651 let handler_id3 = Ustr::from("3");
652 let handler3 = get_stub_shareable_handler(Some(handler_id3));
653
654 let handler_id4 = Ustr::from("4");
655 let handler4 = get_stub_shareable_handler(Some(handler_id4));
656
657 subscribe(topic, handler1, None);
658 subscribe(topic, handler2, None);
659 subscribe(topic, handler3, Some(1));
660 subscribe(topic, handler4, Some(2));
661 let topic = Ustr::from(topic);
662
663 let subs = msgbus.borrow().matching_subscriptions(&topic);
664 assert_eq!(subs.len(), 4);
665 assert_eq!(subs[0].handler_id, handler_id4);
666 assert_eq!(subs[1].handler_id, handler_id3);
667 assert_eq!(subs[2].handler_id, handler_id1);
668 assert_eq!(subs[3].handler_id, handler_id2);
669 }
670
671 #[rstest]
672 #[case("*", "*", true)]
673 #[case("a", "*", true)]
674 #[case("a", "a", true)]
675 #[case("a", "b", false)]
676 #[case("data.quotes.BINANCE", "data.*", true)]
677 #[case("data.quotes.BINANCE", "data.quotes*", true)]
678 #[case("data.quotes.BINANCE", "data.*.BINANCE", true)]
679 #[case("data.trades.BINANCE.ETHUSDT", "data.*.BINANCE.*", true)]
680 #[case("data.trades.BINANCE.ETHUSDT", "data.*.BINANCE.ETH*", true)]
681 #[case("data.trades.BINANCE.ETHUSDT", "data.*.BINANCE.ETH???", false)]
682 #[case("data.trades.BINANCE.ETHUSD", "data.*.BINANCE.ETH???", true)]
683 #[case("data.trades.BINANCE.ETHUSDT", "data.*.BINANCE.ET[HC]USDT", false)]
685 #[case("data.trades.BINANCE.ETHUSDT", "data.*.BINANCE.ET[!ABC]USDT", false)]
687 #[case("data.trades.BINANCE.ETHUSDT", "data.*.BINANCE.ET[^ABC]USDT", false)]
689 fn test_is_matching(#[case] topic: &str, #[case] pattern: &str, #[case] expected: bool) {
690 assert_eq!(
691 is_matching(&Ustr::from(topic), &Ustr::from(pattern)),
692 expected
693 );
694 }
695}