Skip to main content

nautilus_common/msgbus/
core.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Core message bus implementation.
17//!
18//! # Design decisions
19//!
20//! ## Why two routing mechanisms?
21//!
22//! The message bus provides typed and Any-based routing to balance performance
23//! and flexibility:
24//!
25//! **Typed routing** optimizes for throughput on known data types:
26//! - `TopicRouter<T>` for pub/sub, `EndpointMap<T>` for point-to-point.
27//! - Handlers implement `Handler<T>`, receive `&T` directly.
28//! - No runtime type checking enables inlining and static dispatch.
29//! - Built-in routers: `QuoteTick`, `TradeTick`, `Bar`, `OrderBookDeltas`,
30//!   `OrderBookDepth10`, `OrderEventAny`, `PositionEvent`, `AccountState`.
31//!
32//! **Any-based routing** provides flexibility for extensibility:
33//! - `subscriptions`/`topics` maps with `ShareableMessageHandler`.
34//! - Handlers implement `Handler<dyn Any>`, receive `&dyn Any`.
35//! - Supports arbitrary message types without modifying the bus.
36//! - Required for Python interop where types aren't known at compile time.
37//!
38//! ## Handler semantics
39//!
40//! **Typed handlers receive `&T` references:**
41//! - Same message delivered to N handlers without cloning.
42//! - Handler decides whether to clone (only if storing).
43//! - Zero-cost for `Copy` types (`QuoteTick`, `TradeTick`, `Bar`).
44//! - Efficient for large types (`OrderBookDeltas`).
45//!
46//! **Any-based handlers pay per-handler overhead:**
47//! - Each handler receives `&dyn Any`, must downcast to `&T`.
48//! - N handlers = N downcasts + N potential clones.
49//! - Runtime type checking on every dispatch.
50//!
51//! ## Performance trade-off
52//!
53//! Typed routing is faster (see `benches/msgbus_typed.rs`, AMD Ryzen 9 7950X):
54//!
55//! | Scenario                    | Typed vs Any |
56//! |-----------------------------|--------------|
57//! | Handler dispatch (noop)     | ~10x faster  |
58//! | Router with 5 subscribers   | ~3.5x faster |
59//! | Router with 10 subscribers  | ~2x faster   |
60//! | High volume (1M messages)   | ~7% faster   |
61//!
62//! Any-based routing pays for flexibility with runtime type checking. Use
63//! typed routing for hot-path data; Any-based for custom types and Python.
64//!
65//! ## Routing paths are separate
66//!
67//! Typed and Any-based routing use separate data structures:
68//! - `publish_quote` routes through `router_quotes`.
69//! - `publish_any` routes through `topics`.
70//!
71//! Publishers and subscribers must use matching APIs. Mixing them causes
72//! silent message loss.
73//!
74//! ## When to use each
75//!
76//! **Typed** (`publish_quote`, `subscribe_quotes`, etc.):
77//! - Market data (quotes, trades, bars, order book updates).
78//! - Order and position events.
79//! - High-frequency data with known types.
80//!
81//! **Any-based** (`publish_any`, `subscribe_any`):
82//! - Custom or user-defined data types.
83//! - Low-frequency messages.
84//! - Python callbacks.
85
86use std::{
87    any::{Any, TypeId},
88    cell::RefCell,
89    collections::HashMap,
90    hash::{Hash, Hasher},
91    rc::Rc,
92};
93
94use ahash::{AHashMap, AHashSet};
95use indexmap::IndexMap;
96use nautilus_core::{UUID4, correctness::FAILED};
97use nautilus_model::{
98    data::{
99        Bar, Data, FundingRateUpdate, GreeksData, IndexPriceUpdate, MarkPriceUpdate,
100        OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick,
101    },
102    events::{AccountState, OrderEventAny, PositionEvent},
103    identifiers::TraderId,
104    orderbook::OrderBook,
105    orders::OrderAny,
106    position::Position,
107};
108use smallvec::SmallVec;
109use ustr::Ustr;
110
111use super::{
112    ShareableMessageHandler,
113    matching::is_matching_backtracking,
114    mstr::{Endpoint, MStr, Pattern, Topic},
115    set_message_bus,
116    switchboard::MessagingSwitchboard,
117    typed_endpoints::{EndpointMap, IntoEndpointMap},
118    typed_router::TopicRouter,
119};
120use crate::messages::{
121    data::{DataCommand, DataResponse},
122    execution::{ExecutionReport, TradingCommand},
123};
124
125/// Represents a subscription to a particular topic.
126///
127/// This is an internal class intended to be used by the message bus to organize
128/// topics and their subscribers.
129///
130#[derive(Clone, Debug)]
131pub struct Subscription {
132    /// The shareable message handler for the subscription.
133    pub handler: ShareableMessageHandler,
134    /// Store a copy of the handler ID for faster equality checks.
135    pub handler_id: Ustr,
136    /// The pattern for the subscription.
137    pub pattern: MStr<Pattern>,
138    /// The priority for the subscription determines the ordering of handlers receiving
139    /// messages being processed, higher priority handlers will receive messages before
140    /// lower priority handlers.
141    pub priority: u8,
142}
143
144impl Subscription {
145    /// Creates a new [`Subscription`] instance.
146    #[must_use]
147    pub fn new(
148        pattern: MStr<Pattern>,
149        handler: ShareableMessageHandler,
150        priority: Option<u8>,
151    ) -> Self {
152        Self {
153            handler_id: handler.0.id(),
154            pattern,
155            handler,
156            priority: priority.unwrap_or(0),
157        }
158    }
159}
160
161impl PartialEq<Self> for Subscription {
162    fn eq(&self, other: &Self) -> bool {
163        self.pattern == other.pattern && self.handler_id == other.handler_id
164    }
165}
166
167impl Eq for Subscription {}
168
169impl PartialOrd for Subscription {
170    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
171        Some(self.cmp(other))
172    }
173}
174
175impl Ord for Subscription {
176    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
177        other
178            .priority
179            .cmp(&self.priority)
180            .then_with(|| self.pattern.cmp(&other.pattern))
181            .then_with(|| self.handler_id.cmp(&other.handler_id))
182    }
183}
184
185impl Hash for Subscription {
186    fn hash<H: Hasher>(&self, state: &mut H) {
187        self.pattern.hash(state);
188        self.handler_id.hash(state);
189    }
190}
191
192/// A generic message bus to facilitate various messaging patterns.
193///
194/// The bus provides both a producer and consumer API for Pub/Sub, Req/Rep, as
195/// well as direct point-to-point messaging to registered endpoints.
196///
197/// Pub/Sub wildcard patterns for hierarchical topics are possible:
198///  - `*` asterisk represents one or more characters in a pattern.
199///  - `?` question mark represents a single character in a pattern.
200///
201/// Given a topic and pattern potentially containing wildcard characters, i.e.
202/// `*` and `?`, where `?` can match any single character in the topic, and `*`
203/// can match any number of characters including zero characters.
204///
205/// The asterisk in a wildcard matches any character zero or more times. For
206/// example, `comp*` matches anything beginning with `comp` which means `comp`,
207/// `complete`, and `computer` are all matched.
208///
209/// A question mark matches a single character once. For example, `c?mp` matches
210/// `camp` and `comp`. The question mark can also be used more than once.
211/// For example, `c??p` would match both of the above examples and `coop`.
212#[derive(Debug)]
213pub struct MessageBus {
214    /// The trader ID associated with the message bus.
215    pub trader_id: TraderId,
216    /// The instance ID associated with the message bus.
217    pub instance_id: UUID4,
218    /// The name for the message bus.
219    pub name: String,
220    /// If the message bus is backed by a database.
221    pub has_backing: bool,
222    pub(crate) switchboard: MessagingSwitchboard,
223    pub(crate) subscriptions: AHashSet<Subscription>,
224    pub(crate) topics: IndexMap<MStr<Topic>, Vec<Subscription>>,
225    pub(crate) endpoints: IndexMap<MStr<Endpoint>, ShareableMessageHandler>,
226    pub(crate) correlation_index: AHashMap<UUID4, ShareableMessageHandler>,
227    pub(crate) router_quotes: TopicRouter<QuoteTick>,
228    pub(crate) router_trades: TopicRouter<TradeTick>,
229    pub(crate) router_bars: TopicRouter<Bar>,
230    pub(crate) router_deltas: TopicRouter<OrderBookDeltas>,
231    pub(crate) router_depth10: TopicRouter<OrderBookDepth10>,
232    pub(crate) router_book_snapshots: TopicRouter<OrderBook>,
233    pub(crate) router_mark_prices: TopicRouter<MarkPriceUpdate>,
234    pub(crate) router_index_prices: TopicRouter<IndexPriceUpdate>,
235    pub(crate) router_funding_rates: TopicRouter<FundingRateUpdate>,
236    pub(crate) router_order_events: TopicRouter<OrderEventAny>,
237    pub(crate) router_position_events: TopicRouter<PositionEvent>,
238    pub(crate) router_account_state: TopicRouter<AccountState>,
239    pub(crate) router_orders: TopicRouter<OrderAny>,
240    pub(crate) router_positions: TopicRouter<Position>,
241    pub(crate) router_greeks: TopicRouter<GreeksData>,
242    #[cfg(feature = "defi")]
243    pub(crate) router_defi_blocks: TopicRouter<nautilus_model::defi::Block>, // nautilus-import-ok
244    #[cfg(feature = "defi")]
245    pub(crate) router_defi_pools: TopicRouter<nautilus_model::defi::Pool>, // nautilus-import-ok
246    #[cfg(feature = "defi")]
247    pub(crate) router_defi_swaps: TopicRouter<nautilus_model::defi::PoolSwap>, // nautilus-import-ok
248    #[cfg(feature = "defi")]
249    pub(crate) router_defi_liquidity: TopicRouter<nautilus_model::defi::PoolLiquidityUpdate>, // nautilus-import-ok
250    #[cfg(feature = "defi")]
251    pub(crate) router_defi_collects: TopicRouter<nautilus_model::defi::PoolFeeCollect>, // nautilus-import-ok
252    #[cfg(feature = "defi")]
253    pub(crate) router_defi_flash: TopicRouter<nautilus_model::defi::PoolFlash>, // nautilus-import-ok
254    #[cfg(feature = "defi")]
255    pub(crate) endpoints_defi_data: IntoEndpointMap<nautilus_model::defi::DefiData>, // nautilus-import-ok
256    pub(crate) endpoints_quotes: EndpointMap<QuoteTick>,
257    pub(crate) endpoints_trades: EndpointMap<TradeTick>,
258    pub(crate) endpoints_bars: EndpointMap<Bar>,
259    pub(crate) endpoints_account_state: EndpointMap<AccountState>,
260    pub(crate) endpoints_trading_commands: IntoEndpointMap<TradingCommand>,
261    pub(crate) endpoints_data_commands: IntoEndpointMap<DataCommand>,
262    pub(crate) endpoints_data_responses: IntoEndpointMap<DataResponse>,
263    pub(crate) endpoints_exec_reports: IntoEndpointMap<ExecutionReport>,
264    pub(crate) endpoints_order_events: IntoEndpointMap<OrderEventAny>,
265    pub(crate) endpoints_data: IntoEndpointMap<Data>,
266    routers_typed: AHashMap<TypeId, Box<dyn Any>>,
267    endpoints_typed: AHashMap<TypeId, Box<dyn Any>>,
268}
269
270impl Default for MessageBus {
271    /// Creates a new default [`MessageBus`] instance.
272    fn default() -> Self {
273        Self::new(TraderId::from("TRADER-001"), UUID4::new(), None, None)
274    }
275}
276
277impl MessageBus {
278    /// Creates a new [`MessageBus`] instance.
279    #[must_use]
280    pub fn new(
281        trader_id: TraderId,
282        instance_id: UUID4,
283        name: Option<String>,
284        _config: Option<HashMap<String, serde_json::Value>>,
285    ) -> Self {
286        Self {
287            trader_id,
288            instance_id,
289            name: name.unwrap_or(stringify!(MessageBus).to_owned()),
290            switchboard: MessagingSwitchboard::default(),
291            subscriptions: AHashSet::new(),
292            topics: IndexMap::new(),
293            endpoints: IndexMap::new(),
294            correlation_index: AHashMap::new(),
295            has_backing: false,
296            router_quotes: TopicRouter::new(),
297            router_trades: TopicRouter::new(),
298            router_bars: TopicRouter::new(),
299            router_deltas: TopicRouter::new(),
300            router_depth10: TopicRouter::new(),
301            router_book_snapshots: TopicRouter::new(),
302            router_mark_prices: TopicRouter::new(),
303            router_index_prices: TopicRouter::new(),
304            router_funding_rates: TopicRouter::new(),
305            router_order_events: TopicRouter::new(),
306            router_position_events: TopicRouter::new(),
307            router_account_state: TopicRouter::new(),
308            router_orders: TopicRouter::new(),
309            router_positions: TopicRouter::new(),
310            router_greeks: TopicRouter::new(),
311            #[cfg(feature = "defi")]
312            router_defi_blocks: TopicRouter::new(),
313            #[cfg(feature = "defi")]
314            router_defi_pools: TopicRouter::new(),
315            #[cfg(feature = "defi")]
316            router_defi_swaps: TopicRouter::new(),
317            #[cfg(feature = "defi")]
318            router_defi_liquidity: TopicRouter::new(),
319            #[cfg(feature = "defi")]
320            router_defi_collects: TopicRouter::new(),
321            #[cfg(feature = "defi")]
322            router_defi_flash: TopicRouter::new(),
323            #[cfg(feature = "defi")]
324            endpoints_defi_data: IntoEndpointMap::new(),
325            endpoints_quotes: EndpointMap::new(),
326            endpoints_trades: EndpointMap::new(),
327            endpoints_bars: EndpointMap::new(),
328            endpoints_account_state: EndpointMap::new(),
329            endpoints_trading_commands: IntoEndpointMap::new(),
330            endpoints_data_commands: IntoEndpointMap::new(),
331            endpoints_data_responses: IntoEndpointMap::new(),
332            endpoints_exec_reports: IntoEndpointMap::new(),
333            endpoints_order_events: IntoEndpointMap::new(),
334            endpoints_data: IntoEndpointMap::new(),
335            routers_typed: AHashMap::new(),
336            endpoints_typed: AHashMap::new(),
337        }
338    }
339
340    /// Registers message bus for the current thread.
341    pub fn register_message_bus(self) -> Rc<RefCell<Self>> {
342        let msgbus = Rc::new(RefCell::new(self));
343        set_message_bus(msgbus.clone());
344        msgbus
345    }
346
347    /// Gets or creates a typed router for custom message type `T`.
348    ///
349    /// # Panics
350    ///
351    /// Panics if the stored router type doesn't match `T` (internal bug).
352    pub fn router<T: 'static>(&mut self) -> &mut TopicRouter<T> {
353        self.routers_typed
354            .entry(TypeId::of::<T>())
355            .or_insert_with(|| Box::new(TopicRouter::<T>::new()))
356            .downcast_mut::<TopicRouter<T>>()
357            .expect("TopicRouter type mismatch - this is a bug")
358    }
359
360    /// Gets or creates a typed endpoint map for custom message type `T`.
361    ///
362    /// # Panics
363    ///
364    /// Panics if the stored endpoint map type doesn't match `T` (internal bug).
365    pub fn endpoint_map<T: 'static>(&mut self) -> &mut EndpointMap<T> {
366        self.endpoints_typed
367            .entry(TypeId::of::<T>())
368            .or_insert_with(|| Box::new(EndpointMap::<T>::new()))
369            .downcast_mut::<EndpointMap<T>>()
370            .expect("EndpointMap type mismatch - this is a bug")
371    }
372
373    /// Returns the memory address of this instance as a hexadecimal string.
374    #[must_use]
375    pub fn mem_address(&self) -> String {
376        format!("{self:p}")
377    }
378
379    /// Returns a reference to the switchboard.
380    #[must_use]
381    pub fn switchboard(&self) -> &MessagingSwitchboard {
382        &self.switchboard
383    }
384
385    /// Returns the registered endpoint addresses.
386    #[must_use]
387    pub fn endpoints(&self) -> Vec<&str> {
388        self.endpoints.iter().map(|e| e.0.as_str()).collect()
389    }
390
391    /// Returns actively subscribed patterns.
392    #[must_use]
393    pub fn patterns(&self) -> Vec<&str> {
394        self.subscriptions
395            .iter()
396            .map(|s| s.pattern.as_str())
397            .collect()
398    }
399
400    /// Returns whether there are subscribers for the `topic`.
401    pub fn has_subscribers<T: AsRef<str>>(&self, topic: T) -> bool {
402        self.subscriptions_count(topic) > 0
403    }
404
405    /// Returns the count of subscribers for the `topic`.
406    ///
407    /// # Panics
408    ///
409    /// Returns an error if the topic is not valid.
410    #[must_use]
411    pub fn subscriptions_count<T: AsRef<str>>(&self, topic: T) -> usize {
412        let topic = MStr::<Topic>::topic(topic).expect(FAILED);
413        self.topics
414            .get(&topic)
415            .map_or_else(|| self.find_topic_matches(topic).len(), |subs| subs.len())
416    }
417
418    /// Returns active subscriptions.
419    #[must_use]
420    pub fn subscriptions(&self) -> Vec<&Subscription> {
421        self.subscriptions.iter().collect()
422    }
423
424    /// Returns the handler IDs for actively subscribed patterns.
425    #[must_use]
426    pub fn subscription_handler_ids(&self) -> Vec<&str> {
427        self.subscriptions
428            .iter()
429            .map(|s| s.handler_id.as_str())
430            .collect()
431    }
432
433    /// Returns whether the endpoint is registered.
434    ///
435    /// # Panics
436    ///
437    /// Returns an error if the endpoint is not valid topic string.
438    #[must_use]
439    pub fn is_registered<T: Into<MStr<Endpoint>>>(&self, endpoint: T) -> bool {
440        let endpoint: MStr<Endpoint> = endpoint.into();
441        self.endpoints.contains_key(&endpoint)
442    }
443
444    /// Returns whether the `handler` is subscribed to the `pattern`.
445    #[must_use]
446    pub fn is_subscribed<T: AsRef<str>>(
447        &self,
448        pattern: T,
449        handler: ShareableMessageHandler,
450    ) -> bool {
451        let pattern = MStr::<Pattern>::pattern(pattern);
452        let sub = Subscription::new(pattern, handler, None);
453        self.subscriptions.contains(&sub)
454    }
455
456    /// Close the message bus which will close the sender channel and join the thread.
457    ///
458    /// # Errors
459    ///
460    /// This function never returns an error (TBD once backing database added).
461    pub const fn close(&self) -> anyhow::Result<()> {
462        // TODO: Integrate the backing database
463        Ok(())
464    }
465
466    /// Returns the handler for the `endpoint`.
467    #[must_use]
468    pub fn get_endpoint(&self, endpoint: MStr<Endpoint>) -> Option<&ShareableMessageHandler> {
469        self.endpoints.get(&endpoint)
470    }
471
472    /// Returns the handler for the `correlation_id`.
473    #[must_use]
474    pub fn get_response_handler(&self, correlation_id: &UUID4) -> Option<&ShareableMessageHandler> {
475        self.correlation_index.get(correlation_id)
476    }
477
478    /// Finds the subscriptions with pattern matching the `topic`.
479    pub(crate) fn find_topic_matches(&self, topic: MStr<Topic>) -> Vec<Subscription> {
480        self.subscriptions
481            .iter()
482            .filter_map(|sub| {
483                if is_matching_backtracking(topic, sub.pattern) {
484                    Some(sub.clone())
485                } else {
486                    None
487                }
488            })
489            .collect()
490    }
491
492    /// Finds the subscriptions which match the `topic` and caches the
493    /// results in the `patterns` map.
494    #[must_use]
495    pub fn matching_subscriptions<T: Into<MStr<Topic>>>(&mut self, topic: T) -> Vec<Subscription> {
496        self.inner_matching_subscriptions(topic.into())
497    }
498
499    pub(crate) fn inner_matching_subscriptions(&mut self, topic: MStr<Topic>) -> Vec<Subscription> {
500        self.topics.get(&topic).cloned().unwrap_or_else(|| {
501            let mut matches = self.find_topic_matches(topic);
502            matches.sort();
503            self.topics.insert(topic, matches.clone());
504            matches
505        })
506    }
507
508    /// Fills a buffer with handlers matching a topic.
509    pub(crate) fn fill_matching_any_handlers(
510        &mut self,
511        topic: MStr<Topic>,
512        buf: &mut SmallVec<[ShareableMessageHandler; 64]>,
513    ) {
514        if let Some(subs) = self.topics.get(&topic) {
515            for sub in subs {
516                buf.push(sub.handler.clone());
517            }
518        } else {
519            let mut matches = self.find_topic_matches(topic);
520            matches.sort();
521
522            for sub in &matches {
523                buf.push(sub.handler.clone());
524            }
525
526            self.topics.insert(topic, matches);
527        }
528    }
529
530    /// Registers a response handler for a specific correlation ID.
531    ///
532    /// # Errors
533    ///
534    /// Returns an error if `handler` is already registered for the `correlation_id`.
535    pub fn register_response_handler(
536        &mut self,
537        correlation_id: &UUID4,
538        handler: ShareableMessageHandler,
539    ) -> anyhow::Result<()> {
540        if self.correlation_index.contains_key(correlation_id) {
541            anyhow::bail!("Correlation ID <{correlation_id}> already has a registered handler");
542        }
543
544        self.correlation_index.insert(*correlation_id, handler);
545
546        Ok(())
547    }
548}
549
550#[cfg(test)]
551mod tests {
552    use rand::{RngExt, SeedableRng, rngs::StdRng};
553    use rstest::rstest;
554    use ustr::Ustr;
555
556    use super::*;
557    use crate::msgbus::{
558        self, ShareableMessageHandler, get_message_bus,
559        matching::is_matching_backtracking,
560        stubs::{get_call_check_handler, get_stub_shareable_handler},
561        subscriptions_count_any,
562    };
563
564    #[rstest]
565    fn test_new() {
566        let trader_id = TraderId::default();
567        let msgbus = MessageBus::new(trader_id, UUID4::new(), None, None);
568
569        assert_eq!(msgbus.trader_id, trader_id);
570        assert_eq!(msgbus.name, stringify!(MessageBus));
571    }
572
573    #[rstest]
574    fn test_endpoints_when_no_endpoints() {
575        let msgbus = get_message_bus();
576        assert!(msgbus.borrow().endpoints().is_empty());
577    }
578
579    #[rstest]
580    fn test_topics_when_no_subscriptions() {
581        let msgbus = get_message_bus();
582        assert!(msgbus.borrow().patterns().is_empty());
583        assert!(!msgbus.borrow().has_subscribers("my-topic"));
584    }
585
586    #[rstest]
587    fn test_is_subscribed_when_no_subscriptions() {
588        let msgbus = get_message_bus();
589        let handler = get_stub_shareable_handler(None);
590
591        assert!(!msgbus.borrow().is_subscribed("my-topic", handler));
592    }
593
594    #[rstest]
595    fn test_get_response_handler_when_no_handler() {
596        let msgbus = get_message_bus();
597        let msgbus_ref = msgbus.borrow();
598        let handler = msgbus_ref.get_response_handler(&UUID4::new());
599        assert!(handler.is_none());
600    }
601
602    #[rstest]
603    fn test_get_response_handler_when_already_registered() {
604        let msgbus = get_message_bus();
605        let mut msgbus_ref = msgbus.borrow_mut();
606        let handler = get_stub_shareable_handler(None);
607
608        let request_id = UUID4::new();
609        msgbus_ref
610            .register_response_handler(&request_id, handler.clone())
611            .unwrap();
612
613        let result = msgbus_ref.register_response_handler(&request_id, handler);
614        assert!(result.is_err());
615    }
616
617    #[rstest]
618    fn test_get_response_handler_when_registered() {
619        let msgbus = get_message_bus();
620        let mut msgbus_ref = msgbus.borrow_mut();
621        let handler = get_stub_shareable_handler(None);
622
623        let request_id = UUID4::new();
624        msgbus_ref
625            .register_response_handler(&request_id, handler)
626            .unwrap();
627
628        let handler = msgbus_ref.get_response_handler(&request_id).unwrap();
629        assert_eq!(handler.id(), handler.id());
630    }
631
632    #[rstest]
633    fn test_is_registered_when_no_registrations() {
634        let msgbus = get_message_bus();
635        assert!(!msgbus.borrow().is_registered("MyEndpoint"));
636    }
637
638    #[rstest]
639    fn test_register_endpoint() {
640        let msgbus = get_message_bus();
641        let endpoint = "MyEndpoint".into();
642        let handler = get_stub_shareable_handler(None);
643
644        msgbus::register_any(endpoint, handler);
645
646        assert_eq!(msgbus.borrow().endpoints(), vec![endpoint.to_string()]);
647        assert!(msgbus.borrow().get_endpoint(endpoint).is_some());
648    }
649
650    #[rstest]
651    fn test_endpoint_send() {
652        let msgbus = get_message_bus();
653        let endpoint = "MyEndpoint".into();
654        let (handler, checker) = get_call_check_handler(None);
655
656        msgbus::register_any(endpoint, handler);
657        assert!(msgbus.borrow().get_endpoint(endpoint).is_some());
658        assert!(!checker.was_called());
659
660        // Send a message to the endpoint
661        msgbus::send_any(endpoint, &"Test Message");
662        assert!(checker.was_called());
663    }
664
665    #[rstest]
666    fn test_deregsiter_endpoint() {
667        let msgbus = get_message_bus();
668        let endpoint = "MyEndpoint".into();
669        let handler = get_stub_shareable_handler(None);
670
671        msgbus::register_any(endpoint, handler);
672        msgbus::deregister_any(endpoint);
673
674        assert!(msgbus.borrow().endpoints().is_empty());
675    }
676
677    #[rstest]
678    fn test_subscribe() {
679        let msgbus = get_message_bus();
680        let topic = "my-topic";
681        let handler = get_stub_shareable_handler(None);
682
683        msgbus::subscribe_any(topic.into(), handler, Some(1));
684
685        assert!(msgbus.borrow().has_subscribers(topic));
686        assert_eq!(msgbus.borrow().patterns(), vec![topic]);
687    }
688
689    #[rstest]
690    fn test_unsubscribe() {
691        let msgbus = get_message_bus();
692        let topic = "my-topic";
693        let handler = get_stub_shareable_handler(None);
694
695        msgbus::subscribe_any(topic.into(), handler.clone(), None);
696        msgbus::unsubscribe_any(topic.into(), handler);
697
698        assert!(!msgbus.borrow().has_subscribers(topic));
699        assert!(msgbus.borrow().patterns().is_empty());
700    }
701
702    #[rstest]
703    fn test_matching_subscriptions() {
704        let msgbus = get_message_bus();
705        let pattern = "my-pattern";
706
707        let handler_id1 = Ustr::from("1");
708        let handler1 = get_stub_shareable_handler(Some(handler_id1));
709
710        let handler_id2 = Ustr::from("2");
711        let handler2 = get_stub_shareable_handler(Some(handler_id2));
712
713        let handler_id3 = Ustr::from("3");
714        let handler3 = get_stub_shareable_handler(Some(handler_id3));
715
716        let handler_id4 = Ustr::from("4");
717        let handler4 = get_stub_shareable_handler(Some(handler_id4));
718
719        msgbus::subscribe_any(pattern.into(), handler1, None);
720        msgbus::subscribe_any(pattern.into(), handler2, None);
721        msgbus::subscribe_any(pattern.into(), handler3, Some(1));
722        msgbus::subscribe_any(pattern.into(), handler4, Some(2));
723
724        assert_eq!(
725            msgbus.borrow().patterns(),
726            vec![pattern, pattern, pattern, pattern]
727        );
728        assert_eq!(subscriptions_count_any(pattern), 4);
729
730        let topic = pattern;
731        let subs = msgbus.borrow_mut().matching_subscriptions(topic);
732        assert_eq!(subs.len(), 4);
733        assert_eq!(subs[0].handler_id, handler_id4);
734        assert_eq!(subs[1].handler_id, handler_id3);
735        assert_eq!(subs[2].handler_id, handler_id1);
736        assert_eq!(subs[3].handler_id, handler_id2);
737    }
738
739    #[rstest]
740    fn test_subscription_pattern_matching() {
741        let msgbus = get_message_bus();
742        let handler1 = get_stub_shareable_handler(Some(Ustr::from("1")));
743        let handler2 = get_stub_shareable_handler(Some(Ustr::from("2")));
744        let handler3 = get_stub_shareable_handler(Some(Ustr::from("3")));
745
746        msgbus::subscribe_any("data.quotes.*".into(), handler1, None);
747        msgbus::subscribe_any("data.trades.*".into(), handler2, None);
748        msgbus::subscribe_any("data.*.BINANCE.*".into(), handler3, None);
749        assert_eq!(msgbus.borrow().subscriptions().len(), 3);
750
751        let topic = "data.quotes.BINANCE.ETHUSDT";
752        assert_eq!(msgbus.borrow().find_topic_matches(topic.into()).len(), 2);
753
754        let matches = msgbus.borrow_mut().matching_subscriptions(topic);
755        assert_eq!(matches.len(), 2);
756        assert_eq!(matches[0].handler_id, Ustr::from("3"));
757        assert_eq!(matches[1].handler_id, Ustr::from("1"));
758    }
759
760    /// A simple reference model for subscription behavior.
761    struct SimpleSubscriptionModel {
762        /// Stores (pattern, handler_id) tuples for active subscriptions.
763        subscriptions: Vec<(String, String)>,
764    }
765
766    impl SimpleSubscriptionModel {
767        fn new() -> Self {
768            Self {
769                subscriptions: Vec::new(),
770            }
771        }
772
773        fn subscribe(&mut self, pattern: &str, handler_id: &str) {
774            let subscription = (pattern.to_string(), handler_id.to_string());
775            if !self.subscriptions.contains(&subscription) {
776                self.subscriptions.push(subscription);
777            }
778        }
779
780        fn unsubscribe(&mut self, pattern: &str, handler_id: &str) -> bool {
781            let subscription = (pattern.to_string(), handler_id.to_string());
782            if let Some(idx) = self.subscriptions.iter().position(|s| s == &subscription) {
783                self.subscriptions.remove(idx);
784                true
785            } else {
786                false
787            }
788        }
789
790        fn is_subscribed(&self, pattern: &str, handler_id: &str) -> bool {
791            self.subscriptions
792                .contains(&(pattern.to_string(), handler_id.to_string()))
793        }
794
795        fn matching_subscriptions(&self, topic: &str) -> Vec<(String, String)> {
796            let topic = topic.into();
797
798            self.subscriptions
799                .iter()
800                .filter(|(pat, _)| is_matching_backtracking(topic, pat.into()))
801                .map(|(pat, id)| (pat.clone(), id.clone()))
802                .collect()
803        }
804
805        fn subscription_count(&self) -> usize {
806            self.subscriptions.len()
807        }
808    }
809
810    #[rstest]
811    fn subscription_model_fuzz_testing() {
812        let mut rng = StdRng::seed_from_u64(42);
813
814        let msgbus = get_message_bus();
815        let mut model = SimpleSubscriptionModel::new();
816
817        // Map from handler_id to handler
818        let mut handlers: Vec<(String, ShareableMessageHandler)> = Vec::new();
819
820        // Generate some patterns
821        let patterns = generate_test_patterns(&mut rng);
822
823        // Generate some handler IDs
824        let handler_ids: Vec<String> = (0..50).map(|i| format!("handler_{i}")).collect();
825
826        // Initialize handlers
827        for id in &handler_ids {
828            let handler = get_stub_shareable_handler(Some(Ustr::from(id)));
829            handlers.push((id.clone(), handler));
830        }
831
832        let num_operations = 50_000;
833        for op_num in 0..num_operations {
834            let operation = rng.random_range(0..4);
835
836            match operation {
837                // Subscribe
838                0 => {
839                    let pattern_idx = rng.random_range(0..patterns.len());
840                    let handler_idx = rng.random_range(0..handlers.len());
841                    let pattern = &patterns[pattern_idx];
842                    let (handler_id, handler) = &handlers[handler_idx];
843
844                    // Apply to reference model
845                    model.subscribe(pattern, handler_id);
846
847                    // Apply to message bus
848                    msgbus::subscribe_any(pattern.as_str().into(), handler.clone(), None);
849
850                    assert_eq!(
851                        model.subscription_count(),
852                        msgbus.borrow().subscriptions().len()
853                    );
854
855                    assert!(
856                        msgbus.borrow().is_subscribed(pattern, handler.clone()),
857                        "Op {op_num}: is_subscribed should return true after subscribe"
858                    );
859                }
860
861                // Unsubscribe
862                1 => {
863                    if model.subscription_count() > 0 {
864                        let sub_idx = rng.random_range(0..model.subscription_count());
865                        let (pattern, handler_id) = model.subscriptions[sub_idx].clone();
866
867                        // Apply to reference model
868                        model.unsubscribe(&pattern, &handler_id);
869
870                        // Find handler
871                        let handler = handlers
872                            .iter()
873                            .find(|(id, _)| id == &handler_id)
874                            .map(|(_, h)| h.clone())
875                            .unwrap();
876
877                        // Apply to message bus
878                        msgbus::unsubscribe_any(pattern.as_str().into(), handler.clone());
879
880                        assert_eq!(
881                            model.subscription_count(),
882                            msgbus.borrow().subscriptions().len()
883                        );
884                        assert!(
885                            !msgbus.borrow().is_subscribed(pattern, handler.clone()),
886                            "Op {op_num}: is_subscribed should return false after unsubscribe"
887                        );
888                    }
889                }
890
891                // Check is_subscribed
892                2 => {
893                    // Get a random pattern and handler
894                    let pattern_idx = rng.random_range(0..patterns.len());
895                    let handler_idx = rng.random_range(0..handlers.len());
896                    let pattern = &patterns[pattern_idx];
897                    let (handler_id, handler) = &handlers[handler_idx];
898
899                    let expected = model.is_subscribed(pattern, handler_id);
900                    let actual = msgbus.borrow().is_subscribed(pattern, handler.clone());
901
902                    assert_eq!(
903                        expected, actual,
904                        "Op {op_num}: Subscription state mismatch for pattern '{pattern}', handler '{handler_id}': expected={expected}, actual={actual}"
905                    );
906                }
907
908                // Check matching_subscriptions
909                3 => {
910                    // Generate a topic
911                    let topic = create_topic(&mut rng);
912
913                    let actual_matches = msgbus.borrow_mut().matching_subscriptions(topic);
914                    let expected_matches = model.matching_subscriptions(&topic);
915
916                    assert_eq!(
917                        expected_matches.len(),
918                        actual_matches.len(),
919                        "Op {}: Match count mismatch for topic '{}': expected={}, actual={}",
920                        op_num,
921                        topic,
922                        expected_matches.len(),
923                        actual_matches.len()
924                    );
925
926                    for sub in &actual_matches {
927                        assert!(
928                            expected_matches
929                                .contains(&(sub.pattern.to_string(), sub.handler_id.to_string())),
930                            "Op {}: Expected match not found: pattern='{}', handler_id='{}'",
931                            op_num,
932                            sub.pattern,
933                            sub.handler_id
934                        );
935                    }
936                }
937                _ => unreachable!(),
938            }
939        }
940    }
941
942    fn generate_pattern_from_topic(topic: &str, rng: &mut StdRng) -> String {
943        let mut pattern = String::new();
944
945        for c in topic.chars() {
946            let val: f64 = rng.random();
947            if val < 0.1 {
948                pattern.push('*');
949            } else if val < 0.3 {
950                pattern.push('?');
951            } else if val < 0.5 {
952                continue;
953            } else {
954                pattern.push(c);
955            };
956        }
957
958        pattern
959    }
960
961    fn generate_test_patterns(rng: &mut StdRng) -> Vec<String> {
962        let mut patterns = vec![
963            "data.*.*.*".to_string(),
964            "*.*.BINANCE.*".to_string(),
965            "events.order.*".to_string(),
966            "data.*.*.?USDT".to_string(),
967            "*.trades.*.BTC*".to_string(),
968            "*.*.*.*".to_string(),
969        ];
970
971        // Add some random patterns
972        for _ in 0..50 {
973            match rng.random_range(0..10) {
974                // Use existing pattern
975                0..=1 => {
976                    let idx = rng.random_range(0..patterns.len());
977                    patterns.push(patterns[idx].clone());
978                }
979                // Generate new pattern from topic
980                _ => {
981                    let topic = create_topic(rng);
982                    let pattern = generate_pattern_from_topic(&topic, rng);
983                    patterns.push(pattern);
984                }
985            }
986        }
987
988        patterns
989    }
990
991    fn create_topic(rng: &mut StdRng) -> Ustr {
992        let cat = ["data", "info", "order"];
993        let model = ["quotes", "trades", "orderbooks", "depths"];
994        let venue = ["BINANCE", "BYBIT", "OKX", "FTX", "KRAKEN"];
995        let instrument = ["BTCUSDT", "ETHUSDT", "SOLUSDT", "XRPUSDT", "DOGEUSDT"];
996
997        let cat = cat[rng.random_range(0..cat.len())];
998        let model = model[rng.random_range(0..model.len())];
999        let venue = venue[rng.random_range(0..venue.len())];
1000        let instrument = instrument[rng.random_range(0..instrument.len())];
1001        Ustr::from(&format!("{cat}.{model}.{venue}.{instrument}"))
1002    }
1003}