1use std::{
87 any::{Any, TypeId},
88 cell::RefCell,
89 collections::HashMap,
90 hash::{Hash, Hasher},
91 rc::Rc,
92};
93
94use ahash::{AHashMap, AHashSet};
95use indexmap::IndexMap;
96use nautilus_core::{UUID4, correctness::FAILED};
97use nautilus_model::{
98 data::{
99 Bar, Data, FundingRateUpdate, GreeksData, IndexPriceUpdate, MarkPriceUpdate,
100 OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick,
101 },
102 events::{AccountState, OrderEventAny, PositionEvent},
103 identifiers::TraderId,
104 orderbook::OrderBook,
105 orders::OrderAny,
106 position::Position,
107};
108use smallvec::SmallVec;
109use ustr::Ustr;
110
111use super::{
112 ShareableMessageHandler,
113 matching::is_matching_backtracking,
114 mstr::{Endpoint, MStr, Pattern, Topic},
115 set_message_bus,
116 switchboard::MessagingSwitchboard,
117 typed_endpoints::{EndpointMap, IntoEndpointMap},
118 typed_router::TopicRouter,
119};
120use crate::messages::{
121 data::{DataCommand, DataResponse},
122 execution::{ExecutionReport, TradingCommand},
123};
124
125#[derive(Clone, Debug)]
131pub struct Subscription {
132 pub handler: ShareableMessageHandler,
134 pub handler_id: Ustr,
136 pub pattern: MStr<Pattern>,
138 pub priority: u8,
142}
143
144impl Subscription {
145 #[must_use]
147 pub fn new(
148 pattern: MStr<Pattern>,
149 handler: ShareableMessageHandler,
150 priority: Option<u8>,
151 ) -> Self {
152 Self {
153 handler_id: handler.0.id(),
154 pattern,
155 handler,
156 priority: priority.unwrap_or(0),
157 }
158 }
159}
160
161impl PartialEq<Self> for Subscription {
162 fn eq(&self, other: &Self) -> bool {
163 self.pattern == other.pattern && self.handler_id == other.handler_id
164 }
165}
166
167impl Eq for Subscription {}
168
169impl PartialOrd for Subscription {
170 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
171 Some(self.cmp(other))
172 }
173}
174
175impl Ord for Subscription {
176 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
177 other
178 .priority
179 .cmp(&self.priority)
180 .then_with(|| self.pattern.cmp(&other.pattern))
181 .then_with(|| self.handler_id.cmp(&other.handler_id))
182 }
183}
184
185impl Hash for Subscription {
186 fn hash<H: Hasher>(&self, state: &mut H) {
187 self.pattern.hash(state);
188 self.handler_id.hash(state);
189 }
190}
191
192#[derive(Debug)]
213pub struct MessageBus {
214 pub trader_id: TraderId,
216 pub instance_id: UUID4,
218 pub name: String,
220 pub has_backing: bool,
222 pub(crate) switchboard: MessagingSwitchboard,
223 pub(crate) subscriptions: AHashSet<Subscription>,
224 pub(crate) topics: IndexMap<MStr<Topic>, Vec<Subscription>>,
225 pub(crate) endpoints: IndexMap<MStr<Endpoint>, ShareableMessageHandler>,
226 pub(crate) correlation_index: AHashMap<UUID4, ShareableMessageHandler>,
227 pub(crate) router_quotes: TopicRouter<QuoteTick>,
228 pub(crate) router_trades: TopicRouter<TradeTick>,
229 pub(crate) router_bars: TopicRouter<Bar>,
230 pub(crate) router_deltas: TopicRouter<OrderBookDeltas>,
231 pub(crate) router_depth10: TopicRouter<OrderBookDepth10>,
232 pub(crate) router_book_snapshots: TopicRouter<OrderBook>,
233 pub(crate) router_mark_prices: TopicRouter<MarkPriceUpdate>,
234 pub(crate) router_index_prices: TopicRouter<IndexPriceUpdate>,
235 pub(crate) router_funding_rates: TopicRouter<FundingRateUpdate>,
236 pub(crate) router_order_events: TopicRouter<OrderEventAny>,
237 pub(crate) router_position_events: TopicRouter<PositionEvent>,
238 pub(crate) router_account_state: TopicRouter<AccountState>,
239 pub(crate) router_orders: TopicRouter<OrderAny>,
240 pub(crate) router_positions: TopicRouter<Position>,
241 pub(crate) router_greeks: TopicRouter<GreeksData>,
242 #[cfg(feature = "defi")]
243 pub(crate) router_defi_blocks: TopicRouter<nautilus_model::defi::Block>, #[cfg(feature = "defi")]
245 pub(crate) router_defi_pools: TopicRouter<nautilus_model::defi::Pool>, #[cfg(feature = "defi")]
247 pub(crate) router_defi_swaps: TopicRouter<nautilus_model::defi::PoolSwap>, #[cfg(feature = "defi")]
249 pub(crate) router_defi_liquidity: TopicRouter<nautilus_model::defi::PoolLiquidityUpdate>, #[cfg(feature = "defi")]
251 pub(crate) router_defi_collects: TopicRouter<nautilus_model::defi::PoolFeeCollect>, #[cfg(feature = "defi")]
253 pub(crate) router_defi_flash: TopicRouter<nautilus_model::defi::PoolFlash>, #[cfg(feature = "defi")]
255 pub(crate) endpoints_defi_data: IntoEndpointMap<nautilus_model::defi::DefiData>, pub(crate) endpoints_quotes: EndpointMap<QuoteTick>,
257 pub(crate) endpoints_trades: EndpointMap<TradeTick>,
258 pub(crate) endpoints_bars: EndpointMap<Bar>,
259 pub(crate) endpoints_account_state: EndpointMap<AccountState>,
260 pub(crate) endpoints_trading_commands: IntoEndpointMap<TradingCommand>,
261 pub(crate) endpoints_data_commands: IntoEndpointMap<DataCommand>,
262 pub(crate) endpoints_data_responses: IntoEndpointMap<DataResponse>,
263 pub(crate) endpoints_exec_reports: IntoEndpointMap<ExecutionReport>,
264 pub(crate) endpoints_order_events: IntoEndpointMap<OrderEventAny>,
265 pub(crate) endpoints_data: IntoEndpointMap<Data>,
266 routers_typed: AHashMap<TypeId, Box<dyn Any>>,
267 endpoints_typed: AHashMap<TypeId, Box<dyn Any>>,
268}
269
270impl Default for MessageBus {
271 fn default() -> Self {
273 Self::new(TraderId::from("TRADER-001"), UUID4::new(), None, None)
274 }
275}
276
277impl MessageBus {
278 #[must_use]
280 pub fn new(
281 trader_id: TraderId,
282 instance_id: UUID4,
283 name: Option<String>,
284 _config: Option<HashMap<String, serde_json::Value>>,
285 ) -> Self {
286 Self {
287 trader_id,
288 instance_id,
289 name: name.unwrap_or(stringify!(MessageBus).to_owned()),
290 switchboard: MessagingSwitchboard::default(),
291 subscriptions: AHashSet::new(),
292 topics: IndexMap::new(),
293 endpoints: IndexMap::new(),
294 correlation_index: AHashMap::new(),
295 has_backing: false,
296 router_quotes: TopicRouter::new(),
297 router_trades: TopicRouter::new(),
298 router_bars: TopicRouter::new(),
299 router_deltas: TopicRouter::new(),
300 router_depth10: TopicRouter::new(),
301 router_book_snapshots: TopicRouter::new(),
302 router_mark_prices: TopicRouter::new(),
303 router_index_prices: TopicRouter::new(),
304 router_funding_rates: TopicRouter::new(),
305 router_order_events: TopicRouter::new(),
306 router_position_events: TopicRouter::new(),
307 router_account_state: TopicRouter::new(),
308 router_orders: TopicRouter::new(),
309 router_positions: TopicRouter::new(),
310 router_greeks: TopicRouter::new(),
311 #[cfg(feature = "defi")]
312 router_defi_blocks: TopicRouter::new(),
313 #[cfg(feature = "defi")]
314 router_defi_pools: TopicRouter::new(),
315 #[cfg(feature = "defi")]
316 router_defi_swaps: TopicRouter::new(),
317 #[cfg(feature = "defi")]
318 router_defi_liquidity: TopicRouter::new(),
319 #[cfg(feature = "defi")]
320 router_defi_collects: TopicRouter::new(),
321 #[cfg(feature = "defi")]
322 router_defi_flash: TopicRouter::new(),
323 #[cfg(feature = "defi")]
324 endpoints_defi_data: IntoEndpointMap::new(),
325 endpoints_quotes: EndpointMap::new(),
326 endpoints_trades: EndpointMap::new(),
327 endpoints_bars: EndpointMap::new(),
328 endpoints_account_state: EndpointMap::new(),
329 endpoints_trading_commands: IntoEndpointMap::new(),
330 endpoints_data_commands: IntoEndpointMap::new(),
331 endpoints_data_responses: IntoEndpointMap::new(),
332 endpoints_exec_reports: IntoEndpointMap::new(),
333 endpoints_order_events: IntoEndpointMap::new(),
334 endpoints_data: IntoEndpointMap::new(),
335 routers_typed: AHashMap::new(),
336 endpoints_typed: AHashMap::new(),
337 }
338 }
339
340 pub fn register_message_bus(self) -> Rc<RefCell<Self>> {
342 let msgbus = Rc::new(RefCell::new(self));
343 set_message_bus(msgbus.clone());
344 msgbus
345 }
346
347 pub fn router<T: 'static>(&mut self) -> &mut TopicRouter<T> {
353 self.routers_typed
354 .entry(TypeId::of::<T>())
355 .or_insert_with(|| Box::new(TopicRouter::<T>::new()))
356 .downcast_mut::<TopicRouter<T>>()
357 .expect("TopicRouter type mismatch - this is a bug")
358 }
359
360 pub fn endpoint_map<T: 'static>(&mut self) -> &mut EndpointMap<T> {
366 self.endpoints_typed
367 .entry(TypeId::of::<T>())
368 .or_insert_with(|| Box::new(EndpointMap::<T>::new()))
369 .downcast_mut::<EndpointMap<T>>()
370 .expect("EndpointMap type mismatch - this is a bug")
371 }
372
373 #[must_use]
375 pub fn mem_address(&self) -> String {
376 format!("{self:p}")
377 }
378
379 #[must_use]
381 pub fn switchboard(&self) -> &MessagingSwitchboard {
382 &self.switchboard
383 }
384
385 #[must_use]
387 pub fn endpoints(&self) -> Vec<&str> {
388 self.endpoints.iter().map(|e| e.0.as_str()).collect()
389 }
390
391 #[must_use]
393 pub fn patterns(&self) -> Vec<&str> {
394 self.subscriptions
395 .iter()
396 .map(|s| s.pattern.as_str())
397 .collect()
398 }
399
400 pub fn has_subscribers<T: AsRef<str>>(&self, topic: T) -> bool {
402 self.subscriptions_count(topic) > 0
403 }
404
405 #[must_use]
411 pub fn subscriptions_count<T: AsRef<str>>(&self, topic: T) -> usize {
412 let topic = MStr::<Topic>::topic(topic).expect(FAILED);
413 self.topics
414 .get(&topic)
415 .map_or_else(|| self.find_topic_matches(topic).len(), |subs| subs.len())
416 }
417
418 #[must_use]
420 pub fn subscriptions(&self) -> Vec<&Subscription> {
421 self.subscriptions.iter().collect()
422 }
423
424 #[must_use]
426 pub fn subscription_handler_ids(&self) -> Vec<&str> {
427 self.subscriptions
428 .iter()
429 .map(|s| s.handler_id.as_str())
430 .collect()
431 }
432
433 #[must_use]
439 pub fn is_registered<T: Into<MStr<Endpoint>>>(&self, endpoint: T) -> bool {
440 let endpoint: MStr<Endpoint> = endpoint.into();
441 self.endpoints.contains_key(&endpoint)
442 }
443
444 #[must_use]
446 pub fn is_subscribed<T: AsRef<str>>(
447 &self,
448 pattern: T,
449 handler: ShareableMessageHandler,
450 ) -> bool {
451 let pattern = MStr::<Pattern>::pattern(pattern);
452 let sub = Subscription::new(pattern, handler, None);
453 self.subscriptions.contains(&sub)
454 }
455
456 pub const fn close(&self) -> anyhow::Result<()> {
462 Ok(())
464 }
465
466 #[must_use]
468 pub fn get_endpoint(&self, endpoint: MStr<Endpoint>) -> Option<&ShareableMessageHandler> {
469 self.endpoints.get(&endpoint)
470 }
471
472 #[must_use]
474 pub fn get_response_handler(&self, correlation_id: &UUID4) -> Option<&ShareableMessageHandler> {
475 self.correlation_index.get(correlation_id)
476 }
477
478 pub(crate) fn find_topic_matches(&self, topic: MStr<Topic>) -> Vec<Subscription> {
480 self.subscriptions
481 .iter()
482 .filter_map(|sub| {
483 if is_matching_backtracking(topic, sub.pattern) {
484 Some(sub.clone())
485 } else {
486 None
487 }
488 })
489 .collect()
490 }
491
492 #[must_use]
495 pub fn matching_subscriptions<T: Into<MStr<Topic>>>(&mut self, topic: T) -> Vec<Subscription> {
496 self.inner_matching_subscriptions(topic.into())
497 }
498
499 pub(crate) fn inner_matching_subscriptions(&mut self, topic: MStr<Topic>) -> Vec<Subscription> {
500 self.topics.get(&topic).cloned().unwrap_or_else(|| {
501 let mut matches = self.find_topic_matches(topic);
502 matches.sort();
503 self.topics.insert(topic, matches.clone());
504 matches
505 })
506 }
507
508 pub(crate) fn fill_matching_any_handlers(
510 &mut self,
511 topic: MStr<Topic>,
512 buf: &mut SmallVec<[ShareableMessageHandler; 64]>,
513 ) {
514 if let Some(subs) = self.topics.get(&topic) {
515 for sub in subs {
516 buf.push(sub.handler.clone());
517 }
518 } else {
519 let mut matches = self.find_topic_matches(topic);
520 matches.sort();
521
522 for sub in &matches {
523 buf.push(sub.handler.clone());
524 }
525
526 self.topics.insert(topic, matches);
527 }
528 }
529
530 pub fn register_response_handler(
536 &mut self,
537 correlation_id: &UUID4,
538 handler: ShareableMessageHandler,
539 ) -> anyhow::Result<()> {
540 if self.correlation_index.contains_key(correlation_id) {
541 anyhow::bail!("Correlation ID <{correlation_id}> already has a registered handler");
542 }
543
544 self.correlation_index.insert(*correlation_id, handler);
545
546 Ok(())
547 }
548}
549
550#[cfg(test)]
551mod tests {
552 use rand::{RngExt, SeedableRng, rngs::StdRng};
553 use rstest::rstest;
554 use ustr::Ustr;
555
556 use super::*;
557 use crate::msgbus::{
558 self, ShareableMessageHandler, get_message_bus,
559 matching::is_matching_backtracking,
560 stubs::{get_call_check_handler, get_stub_shareable_handler},
561 subscriptions_count_any,
562 };
563
564 #[rstest]
565 fn test_new() {
566 let trader_id = TraderId::default();
567 let msgbus = MessageBus::new(trader_id, UUID4::new(), None, None);
568
569 assert_eq!(msgbus.trader_id, trader_id);
570 assert_eq!(msgbus.name, stringify!(MessageBus));
571 }
572
573 #[rstest]
574 fn test_endpoints_when_no_endpoints() {
575 let msgbus = get_message_bus();
576 assert!(msgbus.borrow().endpoints().is_empty());
577 }
578
579 #[rstest]
580 fn test_topics_when_no_subscriptions() {
581 let msgbus = get_message_bus();
582 assert!(msgbus.borrow().patterns().is_empty());
583 assert!(!msgbus.borrow().has_subscribers("my-topic"));
584 }
585
586 #[rstest]
587 fn test_is_subscribed_when_no_subscriptions() {
588 let msgbus = get_message_bus();
589 let handler = get_stub_shareable_handler(None);
590
591 assert!(!msgbus.borrow().is_subscribed("my-topic", handler));
592 }
593
594 #[rstest]
595 fn test_get_response_handler_when_no_handler() {
596 let msgbus = get_message_bus();
597 let msgbus_ref = msgbus.borrow();
598 let handler = msgbus_ref.get_response_handler(&UUID4::new());
599 assert!(handler.is_none());
600 }
601
602 #[rstest]
603 fn test_get_response_handler_when_already_registered() {
604 let msgbus = get_message_bus();
605 let mut msgbus_ref = msgbus.borrow_mut();
606 let handler = get_stub_shareable_handler(None);
607
608 let request_id = UUID4::new();
609 msgbus_ref
610 .register_response_handler(&request_id, handler.clone())
611 .unwrap();
612
613 let result = msgbus_ref.register_response_handler(&request_id, handler);
614 assert!(result.is_err());
615 }
616
617 #[rstest]
618 fn test_get_response_handler_when_registered() {
619 let msgbus = get_message_bus();
620 let mut msgbus_ref = msgbus.borrow_mut();
621 let handler = get_stub_shareable_handler(None);
622
623 let request_id = UUID4::new();
624 msgbus_ref
625 .register_response_handler(&request_id, handler)
626 .unwrap();
627
628 let handler = msgbus_ref.get_response_handler(&request_id).unwrap();
629 assert_eq!(handler.id(), handler.id());
630 }
631
632 #[rstest]
633 fn test_is_registered_when_no_registrations() {
634 let msgbus = get_message_bus();
635 assert!(!msgbus.borrow().is_registered("MyEndpoint"));
636 }
637
638 #[rstest]
639 fn test_register_endpoint() {
640 let msgbus = get_message_bus();
641 let endpoint = "MyEndpoint".into();
642 let handler = get_stub_shareable_handler(None);
643
644 msgbus::register_any(endpoint, handler);
645
646 assert_eq!(msgbus.borrow().endpoints(), vec![endpoint.to_string()]);
647 assert!(msgbus.borrow().get_endpoint(endpoint).is_some());
648 }
649
650 #[rstest]
651 fn test_endpoint_send() {
652 let msgbus = get_message_bus();
653 let endpoint = "MyEndpoint".into();
654 let (handler, checker) = get_call_check_handler(None);
655
656 msgbus::register_any(endpoint, handler);
657 assert!(msgbus.borrow().get_endpoint(endpoint).is_some());
658 assert!(!checker.was_called());
659
660 msgbus::send_any(endpoint, &"Test Message");
662 assert!(checker.was_called());
663 }
664
665 #[rstest]
666 fn test_deregsiter_endpoint() {
667 let msgbus = get_message_bus();
668 let endpoint = "MyEndpoint".into();
669 let handler = get_stub_shareable_handler(None);
670
671 msgbus::register_any(endpoint, handler);
672 msgbus::deregister_any(endpoint);
673
674 assert!(msgbus.borrow().endpoints().is_empty());
675 }
676
677 #[rstest]
678 fn test_subscribe() {
679 let msgbus = get_message_bus();
680 let topic = "my-topic";
681 let handler = get_stub_shareable_handler(None);
682
683 msgbus::subscribe_any(topic.into(), handler, Some(1));
684
685 assert!(msgbus.borrow().has_subscribers(topic));
686 assert_eq!(msgbus.borrow().patterns(), vec![topic]);
687 }
688
689 #[rstest]
690 fn test_unsubscribe() {
691 let msgbus = get_message_bus();
692 let topic = "my-topic";
693 let handler = get_stub_shareable_handler(None);
694
695 msgbus::subscribe_any(topic.into(), handler.clone(), None);
696 msgbus::unsubscribe_any(topic.into(), handler);
697
698 assert!(!msgbus.borrow().has_subscribers(topic));
699 assert!(msgbus.borrow().patterns().is_empty());
700 }
701
702 #[rstest]
703 fn test_matching_subscriptions() {
704 let msgbus = get_message_bus();
705 let pattern = "my-pattern";
706
707 let handler_id1 = Ustr::from("1");
708 let handler1 = get_stub_shareable_handler(Some(handler_id1));
709
710 let handler_id2 = Ustr::from("2");
711 let handler2 = get_stub_shareable_handler(Some(handler_id2));
712
713 let handler_id3 = Ustr::from("3");
714 let handler3 = get_stub_shareable_handler(Some(handler_id3));
715
716 let handler_id4 = Ustr::from("4");
717 let handler4 = get_stub_shareable_handler(Some(handler_id4));
718
719 msgbus::subscribe_any(pattern.into(), handler1, None);
720 msgbus::subscribe_any(pattern.into(), handler2, None);
721 msgbus::subscribe_any(pattern.into(), handler3, Some(1));
722 msgbus::subscribe_any(pattern.into(), handler4, Some(2));
723
724 assert_eq!(
725 msgbus.borrow().patterns(),
726 vec![pattern, pattern, pattern, pattern]
727 );
728 assert_eq!(subscriptions_count_any(pattern), 4);
729
730 let topic = pattern;
731 let subs = msgbus.borrow_mut().matching_subscriptions(topic);
732 assert_eq!(subs.len(), 4);
733 assert_eq!(subs[0].handler_id, handler_id4);
734 assert_eq!(subs[1].handler_id, handler_id3);
735 assert_eq!(subs[2].handler_id, handler_id1);
736 assert_eq!(subs[3].handler_id, handler_id2);
737 }
738
739 #[rstest]
740 fn test_subscription_pattern_matching() {
741 let msgbus = get_message_bus();
742 let handler1 = get_stub_shareable_handler(Some(Ustr::from("1")));
743 let handler2 = get_stub_shareable_handler(Some(Ustr::from("2")));
744 let handler3 = get_stub_shareable_handler(Some(Ustr::from("3")));
745
746 msgbus::subscribe_any("data.quotes.*".into(), handler1, None);
747 msgbus::subscribe_any("data.trades.*".into(), handler2, None);
748 msgbus::subscribe_any("data.*.BINANCE.*".into(), handler3, None);
749 assert_eq!(msgbus.borrow().subscriptions().len(), 3);
750
751 let topic = "data.quotes.BINANCE.ETHUSDT";
752 assert_eq!(msgbus.borrow().find_topic_matches(topic.into()).len(), 2);
753
754 let matches = msgbus.borrow_mut().matching_subscriptions(topic);
755 assert_eq!(matches.len(), 2);
756 assert_eq!(matches[0].handler_id, Ustr::from("3"));
757 assert_eq!(matches[1].handler_id, Ustr::from("1"));
758 }
759
760 struct SimpleSubscriptionModel {
762 subscriptions: Vec<(String, String)>,
764 }
765
766 impl SimpleSubscriptionModel {
767 fn new() -> Self {
768 Self {
769 subscriptions: Vec::new(),
770 }
771 }
772
773 fn subscribe(&mut self, pattern: &str, handler_id: &str) {
774 let subscription = (pattern.to_string(), handler_id.to_string());
775 if !self.subscriptions.contains(&subscription) {
776 self.subscriptions.push(subscription);
777 }
778 }
779
780 fn unsubscribe(&mut self, pattern: &str, handler_id: &str) -> bool {
781 let subscription = (pattern.to_string(), handler_id.to_string());
782 if let Some(idx) = self.subscriptions.iter().position(|s| s == &subscription) {
783 self.subscriptions.remove(idx);
784 true
785 } else {
786 false
787 }
788 }
789
790 fn is_subscribed(&self, pattern: &str, handler_id: &str) -> bool {
791 self.subscriptions
792 .contains(&(pattern.to_string(), handler_id.to_string()))
793 }
794
795 fn matching_subscriptions(&self, topic: &str) -> Vec<(String, String)> {
796 let topic = topic.into();
797
798 self.subscriptions
799 .iter()
800 .filter(|(pat, _)| is_matching_backtracking(topic, pat.into()))
801 .map(|(pat, id)| (pat.clone(), id.clone()))
802 .collect()
803 }
804
805 fn subscription_count(&self) -> usize {
806 self.subscriptions.len()
807 }
808 }
809
810 #[rstest]
811 fn subscription_model_fuzz_testing() {
812 let mut rng = StdRng::seed_from_u64(42);
813
814 let msgbus = get_message_bus();
815 let mut model = SimpleSubscriptionModel::new();
816
817 let mut handlers: Vec<(String, ShareableMessageHandler)> = Vec::new();
819
820 let patterns = generate_test_patterns(&mut rng);
822
823 let handler_ids: Vec<String> = (0..50).map(|i| format!("handler_{i}")).collect();
825
826 for id in &handler_ids {
828 let handler = get_stub_shareable_handler(Some(Ustr::from(id)));
829 handlers.push((id.clone(), handler));
830 }
831
832 let num_operations = 50_000;
833 for op_num in 0..num_operations {
834 let operation = rng.random_range(0..4);
835
836 match operation {
837 0 => {
839 let pattern_idx = rng.random_range(0..patterns.len());
840 let handler_idx = rng.random_range(0..handlers.len());
841 let pattern = &patterns[pattern_idx];
842 let (handler_id, handler) = &handlers[handler_idx];
843
844 model.subscribe(pattern, handler_id);
846
847 msgbus::subscribe_any(pattern.as_str().into(), handler.clone(), None);
849
850 assert_eq!(
851 model.subscription_count(),
852 msgbus.borrow().subscriptions().len()
853 );
854
855 assert!(
856 msgbus.borrow().is_subscribed(pattern, handler.clone()),
857 "Op {op_num}: is_subscribed should return true after subscribe"
858 );
859 }
860
861 1 => {
863 if model.subscription_count() > 0 {
864 let sub_idx = rng.random_range(0..model.subscription_count());
865 let (pattern, handler_id) = model.subscriptions[sub_idx].clone();
866
867 model.unsubscribe(&pattern, &handler_id);
869
870 let handler = handlers
872 .iter()
873 .find(|(id, _)| id == &handler_id)
874 .map(|(_, h)| h.clone())
875 .unwrap();
876
877 msgbus::unsubscribe_any(pattern.as_str().into(), handler.clone());
879
880 assert_eq!(
881 model.subscription_count(),
882 msgbus.borrow().subscriptions().len()
883 );
884 assert!(
885 !msgbus.borrow().is_subscribed(pattern, handler.clone()),
886 "Op {op_num}: is_subscribed should return false after unsubscribe"
887 );
888 }
889 }
890
891 2 => {
893 let pattern_idx = rng.random_range(0..patterns.len());
895 let handler_idx = rng.random_range(0..handlers.len());
896 let pattern = &patterns[pattern_idx];
897 let (handler_id, handler) = &handlers[handler_idx];
898
899 let expected = model.is_subscribed(pattern, handler_id);
900 let actual = msgbus.borrow().is_subscribed(pattern, handler.clone());
901
902 assert_eq!(
903 expected, actual,
904 "Op {op_num}: Subscription state mismatch for pattern '{pattern}', handler '{handler_id}': expected={expected}, actual={actual}"
905 );
906 }
907
908 3 => {
910 let topic = create_topic(&mut rng);
912
913 let actual_matches = msgbus.borrow_mut().matching_subscriptions(topic);
914 let expected_matches = model.matching_subscriptions(&topic);
915
916 assert_eq!(
917 expected_matches.len(),
918 actual_matches.len(),
919 "Op {}: Match count mismatch for topic '{}': expected={}, actual={}",
920 op_num,
921 topic,
922 expected_matches.len(),
923 actual_matches.len()
924 );
925
926 for sub in &actual_matches {
927 assert!(
928 expected_matches
929 .contains(&(sub.pattern.to_string(), sub.handler_id.to_string())),
930 "Op {}: Expected match not found: pattern='{}', handler_id='{}'",
931 op_num,
932 sub.pattern,
933 sub.handler_id
934 );
935 }
936 }
937 _ => unreachable!(),
938 }
939 }
940 }
941
942 fn generate_pattern_from_topic(topic: &str, rng: &mut StdRng) -> String {
943 let mut pattern = String::new();
944
945 for c in topic.chars() {
946 let val: f64 = rng.random();
947 if val < 0.1 {
948 pattern.push('*');
949 } else if val < 0.3 {
950 pattern.push('?');
951 } else if val < 0.5 {
952 continue;
953 } else {
954 pattern.push(c);
955 };
956 }
957
958 pattern
959 }
960
961 fn generate_test_patterns(rng: &mut StdRng) -> Vec<String> {
962 let mut patterns = vec![
963 "data.*.*.*".to_string(),
964 "*.*.BINANCE.*".to_string(),
965 "events.order.*".to_string(),
966 "data.*.*.?USDT".to_string(),
967 "*.trades.*.BTC*".to_string(),
968 "*.*.*.*".to_string(),
969 ];
970
971 for _ in 0..50 {
973 match rng.random_range(0..10) {
974 0..=1 => {
976 let idx = rng.random_range(0..patterns.len());
977 patterns.push(patterns[idx].clone());
978 }
979 _ => {
981 let topic = create_topic(rng);
982 let pattern = generate_pattern_from_topic(&topic, rng);
983 patterns.push(pattern);
984 }
985 }
986 }
987
988 patterns
989 }
990
991 fn create_topic(rng: &mut StdRng) -> Ustr {
992 let cat = ["data", "info", "order"];
993 let model = ["quotes", "trades", "orderbooks", "depths"];
994 let venue = ["BINANCE", "BYBIT", "OKX", "FTX", "KRAKEN"];
995 let instrument = ["BTCUSDT", "ETHUSDT", "SOLUSDT", "XRPUSDT", "DOGEUSDT"];
996
997 let cat = cat[rng.random_range(0..cat.len())];
998 let model = model[rng.random_range(0..model.len())];
999 let venue = venue[rng.random_range(0..venue.len())];
1000 let instrument = instrument[rng.random_range(0..instrument.len())];
1001 Ustr::from(&format!("{cat}.{model}.{venue}.{instrument}"))
1002 }
1003}