1pub mod database;
19pub mod handler;
20pub mod stubs;
21pub mod switchboard;
22
23use std::{
24 any::Any,
25 collections::HashMap,
26 fmt::Debug,
27 hash::{Hash, Hasher},
28};
29
30use handler::ShareableMessageHandler;
31use indexmap::IndexMap;
32use nautilus_core::UUID4;
33use nautilus_model::{data::Data, identifiers::TraderId};
34use switchboard::MessagingSwitchboard;
35use ustr::Ustr;
36
37use crate::messages::data::DataResponse;
38
39pub const CLOSE_TOPIC: &str = "CLOSE";
40
41#[derive(Clone)]
55pub struct Subscription {
56 pub handler: ShareableMessageHandler,
58 pub handler_id: Ustr,
60 pub topic: Ustr,
62 pub priority: u8,
66}
67
68impl Subscription {
69 #[must_use]
71 pub fn new<T: AsRef<str>>(
72 topic: T,
73 handler: ShareableMessageHandler,
74 priority: Option<u8>,
75 ) -> Self {
76 let handler_id = handler.0.id();
77
78 Self {
79 handler_id,
80 topic: Ustr::from(topic.as_ref()),
81 handler,
82 priority: priority.unwrap_or(0),
83 }
84 }
85}
86
87impl Debug for Subscription {
88 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89 write!(
90 f,
91 "Subscription {{ topic: {}, handler: {}, priority: {} }}",
92 self.topic, self.handler_id, self.priority
93 )
94 }
95}
96
97impl PartialEq<Self> for Subscription {
98 fn eq(&self, other: &Self) -> bool {
99 self.topic == other.topic && self.handler_id == other.handler_id
100 }
101}
102
103impl Eq for Subscription {}
104
105impl PartialOrd for Subscription {
106 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
107 Some(self.cmp(other))
108 }
109}
110
111impl Ord for Subscription {
112 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
113 other.priority.cmp(&self.priority)
114 }
115}
116
117impl Hash for Subscription {
118 fn hash<H: Hasher>(&self, state: &mut H) {
119 self.topic.hash(state);
120 self.handler_id.hash(state);
121 }
122}
123
124#[cfg_attr(
145 feature = "python",
146 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
147)]
148pub struct MessageBus {
149 pub trader_id: TraderId,
151 pub instance_id: UUID4,
153 pub name: String,
155 pub has_backing: bool,
157 pub switchboard: MessagingSwitchboard,
159 subscriptions: IndexMap<Subscription, Vec<Ustr>>,
164 patterns: IndexMap<Ustr, Vec<Subscription>>,
167 endpoints: IndexMap<Ustr, ShareableMessageHandler>,
169}
170
171unsafe impl Send for MessageBus {}
173unsafe impl Sync for MessageBus {}
174
175impl MessageBus {
176 #[must_use]
178 pub fn new(
179 trader_id: TraderId,
180 instance_id: UUID4,
181 name: Option<String>,
182 _config: Option<HashMap<String, serde_json::Value>>,
183 ) -> Self {
184 Self {
185 trader_id,
186 instance_id,
187 name: name.unwrap_or(stringify!(MessageBus).to_owned()),
188 switchboard: MessagingSwitchboard::default(),
189 subscriptions: IndexMap::new(),
190 patterns: IndexMap::new(),
191 endpoints: IndexMap::new(),
192 has_backing: false,
193 }
194 }
195
196 #[must_use]
198 pub fn memory_address(&self) -> String {
199 format!("{:?}", std::ptr::from_ref(self))
200 }
201
202 #[must_use]
204 pub fn endpoints(&self) -> Vec<&str> {
205 self.endpoints.keys().map(Ustr::as_str).collect()
206 }
207
208 #[must_use]
210 pub fn topics(&self) -> Vec<&str> {
211 self.subscriptions
212 .keys()
213 .map(|s| s.topic.as_str())
214 .collect()
215 }
216
217 #[must_use]
219 pub fn has_subscribers<T: AsRef<str>>(&self, pattern: T) -> bool {
220 self.matching_handlers(&Ustr::from(pattern.as_ref()))
221 .next()
222 .is_some()
223 }
224
225 #[must_use]
227 pub fn subscriptions_count<T: AsRef<str>>(&self, pattern: T) -> usize {
228 self.matching_subscriptions(&Ustr::from(pattern.as_ref()))
229 .len()
230 }
231
232 #[must_use]
234 pub fn subscriptions(&self) -> Vec<&Subscription> {
235 self.subscriptions.keys().collect()
236 }
237
238 #[must_use]
240 pub fn subscription_handler_ids(&self) -> Vec<&str> {
241 self.subscriptions
242 .keys()
243 .map(|s| s.handler_id.as_str())
244 .collect()
245 }
246
247 #[must_use]
249 pub fn is_registered<T: AsRef<str>>(&self, endpoint: T) -> bool {
250 self.endpoints.contains_key(&Ustr::from(endpoint.as_ref()))
251 }
252
253 #[must_use]
255 pub fn is_subscribed<T: AsRef<str>>(&self, topic: T, handler: ShareableMessageHandler) -> bool {
256 let sub = Subscription::new(topic, handler, None);
257 self.subscriptions.contains_key(&sub)
258 }
259
260 pub const fn close(&self) -> anyhow::Result<()> {
262 Ok(())
264 }
265
266 pub fn register<T: AsRef<str>>(&mut self, endpoint: T, handler: ShareableMessageHandler) {
268 log::debug!(
269 "Registering endpoint '{}' with handler ID {} at {}",
270 endpoint.as_ref(),
271 handler.0.id(),
272 self.memory_address(),
273 );
274 self.endpoints
276 .insert(Ustr::from(endpoint.as_ref()), handler);
277 }
278
279 pub fn deregister(&mut self, endpoint: &Ustr) {
281 log::debug!(
282 "Deregistering endpoint '{endpoint}' at {}",
283 self.memory_address()
284 );
285 self.endpoints.shift_remove(endpoint);
287 }
288
289 pub fn subscribe<T: AsRef<str>>(
291 &mut self,
292 topic: T,
293 handler: ShareableMessageHandler,
294 priority: Option<u8>,
295 ) {
296 log::debug!(
297 "Subscribing for topic '{}' at {}",
298 topic.as_ref(),
299 self.memory_address(),
300 );
301 let sub = Subscription::new(topic.as_ref(), handler, priority);
302 if self.subscriptions.contains_key(&sub) {
303 log::error!("{sub:?} already exists.");
304 return;
305 }
306
307 let mut matches = Vec::new();
309 for (pattern, subs) in &mut self.patterns {
310 if is_matching(&Ustr::from(topic.as_ref()), pattern) {
311 subs.push(sub.clone());
312 subs.sort();
313 matches.push(*pattern);
315 }
316 }
317
318 matches.sort();
319
320 self.subscriptions.insert(sub, matches);
321 }
322
323 pub fn unsubscribe<T: AsRef<str>>(&mut self, topic: T, handler: ShareableMessageHandler) {
325 log::debug!(
326 "Unsubscribing for topic '{}' at {}",
327 topic.as_ref(),
328 self.memory_address(),
329 );
330 let sub = Subscription::new(topic, handler, None);
331 self.subscriptions.shift_remove(&sub);
332 }
333
334 #[must_use]
336 pub fn get_endpoint<T: AsRef<str>>(&self, endpoint: T) -> Option<&ShareableMessageHandler> {
337 self.endpoints.get(&Ustr::from(endpoint.as_ref()))
338 }
339
340 #[must_use]
341 pub fn matching_subscriptions<'a>(&'a self, pattern: &'a Ustr) -> Vec<&'a Subscription> {
342 let mut matching_subs: Vec<&'a Subscription> = Vec::new();
343
344 matching_subs.extend(self.subscriptions.iter().filter_map(|(sub, _)| {
346 if is_matching(&sub.topic, pattern) {
347 Some(sub)
348 } else {
349 None
350 }
351 }));
352
353 for subs in self.patterns.values() {
356 let filtered_subs: Vec<&Subscription> = subs
357 .iter()
358 .collect();
361
362 matching_subs.extend(filtered_subs);
363 }
364
365 matching_subs.sort();
367 matching_subs
368 }
369
370 fn matching_handlers<'a>(
371 &'a self,
372 pattern: &'a Ustr,
373 ) -> impl Iterator<Item = &'a ShareableMessageHandler> {
374 self.subscriptions.iter().filter_map(move |(sub, _)| {
375 if is_matching(&sub.topic, pattern) {
376 Some(&sub.handler)
377 } else {
378 None
379 }
380 })
381 }
382
383 pub fn send(&self, endpoint: &Ustr, message: &dyn Any) {
385 if let Some(handler) = self.get_endpoint(endpoint) {
386 handler.0.handle(message);
387 }
388 }
389
390 pub fn publish(&self, topic: &Ustr, message: &dyn Any) {
392 log::trace!(
393 "Publishing topic '{topic}' {message:?} {}",
394 self.memory_address()
395 );
396 let matching_subs = self.matching_subscriptions(topic);
397
398 log::trace!("Matched {} subscriptions", matching_subs.len());
399
400 for sub in matching_subs {
401 log::trace!("Matched {sub:?}");
402 sub.handler.0.handle(message);
403 }
404 }
405}
406
407impl MessageBus {
409 pub fn send_response(&self, message: DataResponse) {
426 if let Some(handler) = self.get_endpoint(message.client_id.inner()) {
427 handler.0.handle_response(message);
428 }
429 }
430
431 pub fn publish_data(&self, topic: &Ustr, message: Data) {
433 let matching_subs = self.matching_subscriptions(topic);
434
435 for sub in matching_subs {
436 sub.handler.0.handle_data(message.clone());
437 }
438 }
439}
440
441#[must_use]
447pub fn is_matching(topic: &Ustr, pattern: &Ustr) -> bool {
448 let mut table = [[false; 256]; 256];
449 table[0][0] = true;
450
451 let m = pattern.len();
452 let n = topic.len();
453
454 pattern.chars().enumerate().for_each(|(j, c)| {
455 if c == '*' {
456 table[0][j + 1] = table[0][j];
457 }
458 });
459
460 topic.chars().enumerate().for_each(|(i, tc)| {
461 pattern.chars().enumerate().for_each(|(j, pc)| {
462 if pc == '*' {
463 table[i + 1][j + 1] = table[i][j + 1] || table[i + 1][j];
464 } else if pc == '?' || tc == pc {
465 table[i + 1][j + 1] = table[i][j];
466 }
467 });
468 });
469
470 table[n][m]
471}
472
473impl Default for MessageBus {
474 fn default() -> Self {
476 Self::new(TraderId::from("TRADER-001"), UUID4::new(), None, None)
477 }
478}
479
480#[cfg(test)]
484mod tests {
485
486 use nautilus_core::UUID4;
487 use rstest::*;
488 use stubs::check_handler_was_called;
489
490 use super::*;
491 use crate::msgbus::stubs::{get_call_check_shareable_handler, get_stub_shareable_handler};
492
493 fn stub_msgbus() -> MessageBus {
494 MessageBus::new(TraderId::from("trader-001"), UUID4::new(), None, None)
495 }
496
497 #[rstest]
498 fn test_new() {
499 let trader_id = TraderId::from("trader-001");
500 let msgbus = MessageBus::new(trader_id, UUID4::new(), None, None);
501
502 assert_eq!(msgbus.trader_id, trader_id);
503 assert_eq!(msgbus.name, stringify!(MessageBus));
504 }
505
506 #[rstest]
507 fn test_endpoints_when_no_endpoints() {
508 let msgbus = stub_msgbus();
509
510 assert!(msgbus.endpoints().is_empty());
511 }
512
513 #[rstest]
514 fn test_topics_when_no_subscriptions() {
515 let msgbus = stub_msgbus();
516
517 assert!(msgbus.topics().is_empty());
518 assert!(!msgbus.has_subscribers("my-topic"));
519 }
520
521 #[rstest]
522 fn test_is_subscribed_when_no_subscriptions() {
523 let msgbus = stub_msgbus();
524 let handler = get_stub_shareable_handler(None);
525
526 assert!(!msgbus.is_subscribed("my-topic", handler));
527 }
528
529 #[rstest]
530 fn test_is_registered_when_no_registrations() {
531 let msgbus = stub_msgbus();
532
533 assert!(!msgbus.is_registered("MyEndpoint"));
534 }
535
536 #[rstest]
537 fn test_regsiter_endpoint() {
538 let mut msgbus = stub_msgbus();
539 let endpoint = "MyEndpoint";
540 let handler = get_stub_shareable_handler(None);
541
542 msgbus.register(endpoint, handler);
543
544 assert_eq!(msgbus.endpoints(), vec![endpoint.to_string()]);
545 assert!(msgbus.get_endpoint(endpoint).is_some());
546 }
547
548 #[rstest]
549 fn test_endpoint_send() {
550 let mut msgbus = stub_msgbus();
551 let endpoint = Ustr::from("MyEndpoint");
552 let handler = get_call_check_shareable_handler(None);
553
554 msgbus.register(endpoint, handler.clone());
555 assert!(msgbus.get_endpoint(endpoint).is_some());
556 assert!(!check_handler_was_called(handler.clone()));
557
558 msgbus.send(&endpoint, &"Test Message");
560 assert!(check_handler_was_called(handler));
561 }
562
563 #[rstest]
564 fn test_deregsiter_endpoint() {
565 let mut msgbus = stub_msgbus();
566 let endpoint = Ustr::from("MyEndpoint");
567 let handler = get_stub_shareable_handler(None);
568
569 msgbus.register(endpoint, handler);
570 msgbus.deregister(&endpoint);
571
572 assert!(msgbus.endpoints().is_empty());
573 }
574
575 #[rstest]
576 fn test_subscribe() {
577 let mut msgbus = stub_msgbus();
578 let topic = "my-topic";
579 let handler = get_stub_shareable_handler(None);
580
581 msgbus.subscribe(topic, handler, Some(1));
582
583 assert!(msgbus.has_subscribers(topic));
584 assert_eq!(msgbus.topics(), vec![topic]);
585 }
586
587 #[rstest]
588 fn test_unsubscribe() {
589 let mut msgbus = stub_msgbus();
590 let topic = "my-topic";
591 let handler = get_stub_shareable_handler(None);
592
593 msgbus.subscribe(topic, handler.clone(), None);
594 msgbus.unsubscribe(topic, handler);
595
596 assert!(!msgbus.has_subscribers(topic));
597 assert!(msgbus.topics().is_empty());
598 }
599
600 #[rstest]
601 fn test_matching_subscriptions() {
602 let mut msgbus = stub_msgbus();
603 let topic = "my-topic";
604
605 let handler_id1 = Ustr::from("1");
606 let handler1 = get_stub_shareable_handler(Some(handler_id1));
607
608 let handler_id2 = Ustr::from("2");
609 let handler2 = get_stub_shareable_handler(Some(handler_id2));
610
611 let handler_id3 = Ustr::from("3");
612 let handler3 = get_stub_shareable_handler(Some(handler_id3));
613
614 let handler_id4 = Ustr::from("4");
615 let handler4 = get_stub_shareable_handler(Some(handler_id4));
616
617 msgbus.subscribe(topic, handler1, None);
618 msgbus.subscribe(topic, handler2, None);
619 msgbus.subscribe(topic, handler3, Some(1));
620 msgbus.subscribe(topic, handler4, Some(2));
621 let topic = Ustr::from(topic);
622 let subs = msgbus.matching_subscriptions(&topic);
623
624 assert_eq!(subs.len(), 4);
625 assert_eq!(subs[0].handler_id, handler_id4);
626 assert_eq!(subs[1].handler_id, handler_id3);
627 assert_eq!(subs[2].handler_id, handler_id1);
628 assert_eq!(subs[3].handler_id, handler_id2);
629 }
630
631 #[rstest]
632 #[case("*", "*", true)]
633 #[case("a", "*", true)]
634 #[case("a", "a", true)]
635 #[case("a", "b", false)]
636 #[case("data.quotes.BINANCE", "data.*", true)]
637 #[case("data.quotes.BINANCE", "data.quotes*", true)]
638 #[case("data.quotes.BINANCE", "data.*.BINANCE", true)]
639 #[case("data.trades.BINANCE.ETHUSDT", "data.*.BINANCE.*", true)]
640 #[case("data.trades.BINANCE.ETHUSDT", "data.*.BINANCE.ETH*", true)]
641 #[case("data.trades.BINANCE.ETHUSDT", "data.*.BINANCE.ETH???", false)]
642 #[case("data.trades.BINANCE.ETHUSD", "data.*.BINANCE.ETH???", true)]
643 #[case("data.trades.BINANCE.ETHUSDT", "data.*.BINANCE.ET[HC]USDT", false)]
645 #[case("data.trades.BINANCE.ETHUSDT", "data.*.BINANCE.ET[!ABC]USDT", false)]
647 #[case("data.trades.BINANCE.ETHUSDT", "data.*.BINANCE.ET[^ABC]USDT", false)]
649 fn test_is_matching(#[case] topic: &str, #[case] pattern: &str, #[case] expected: bool) {
650 assert_eq!(
651 is_matching(&Ustr::from(topic), &Ustr::from(pattern)),
652 expected
653 );
654 }
655}