nautilus_common/msgbus/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! A common in-memory `MessageBus` for loosely coupled message passing patterns.
17
18pub mod database;
19pub mod handler;
20pub mod stubs;
21pub mod switchboard;
22
23use std::{
24    any::Any,
25    cell::RefCell,
26    collections::HashMap,
27    fmt::Debug,
28    hash::{Hash, Hasher},
29    rc::Rc,
30    sync::OnceLock,
31};
32
33use handler::ShareableMessageHandler;
34use indexmap::IndexMap;
35use nautilus_core::UUID4;
36use nautilus_model::{data::Data, identifiers::TraderId};
37use switchboard::MessagingSwitchboard;
38use ustr::Ustr;
39
40use crate::messages::data::DataResponse;
41
42pub const CLOSE_TOPIC: &str = "CLOSE";
43
44pub struct MessageBusWrapper(Rc<RefCell<MessageBus>>);
45
46unsafe impl Send for MessageBusWrapper {}
47unsafe impl Sync for MessageBusWrapper {}
48
49static MESSAGE_BUS: OnceLock<MessageBusWrapper> = OnceLock::new();
50
51pub fn set_message_bus(msgbus: Rc<RefCell<MessageBus>>) {
52    if MESSAGE_BUS.set(MessageBusWrapper(msgbus)).is_err() {
53        panic!("Failed to set MessageBus");
54    }
55}
56
57pub fn get_message_bus() -> Rc<RefCell<MessageBus>> {
58    if MESSAGE_BUS.get().is_none() {
59        // Initialize default message bus
60        let msgbus = MessageBus::default();
61        let msgbus = Rc::new(RefCell::new(msgbus));
62        let _ = MESSAGE_BUS.set(MessageBusWrapper(msgbus.clone()));
63        msgbus
64    } else {
65        MESSAGE_BUS.get().unwrap().0.clone()
66    }
67}
68
69pub fn send(endpoint: &Ustr, message: &dyn Any) {
70    let handler = get_message_bus().borrow().get_endpoint(endpoint).cloned();
71    if let Some(handler) = handler {
72        handler.0.handle(message);
73    }
74}
75
76/// Publish a message to a topic.
77pub fn publish(topic: &Ustr, message: &dyn Any) {
78    log::trace!(
79        "Publishing topic '{topic}' {message:?} at {}",
80        get_message_bus().borrow().memory_address()
81    );
82    let matching_subs = get_message_bus().borrow().matching_subscriptions(topic);
83
84    log::trace!("Matched {} subscriptions", matching_subs.len());
85
86    for sub in matching_subs {
87        log::trace!("Matched {sub:?}");
88        sub.handler.0.handle(message);
89    }
90}
91
92/// Registers the given `handler` for the `endpoint` address.
93pub fn register<T: AsRef<str>>(endpoint: T, handler: ShareableMessageHandler) {
94    log::debug!(
95        "Registering endpoint '{}' with handler ID {} at {}",
96        endpoint.as_ref(),
97        handler.0.id(),
98        get_message_bus().borrow().memory_address(),
99    );
100
101    // Updates value if key already exists
102    get_message_bus()
103        .borrow_mut()
104        .endpoints
105        .insert(Ustr::from(endpoint.as_ref()), handler);
106}
107
108/// Deregisters the given `handler` for the `endpoint` address.
109pub fn deregister(endpoint: &Ustr) {
110    log::debug!(
111        "Deregistering endpoint '{endpoint}' at {}",
112        get_message_bus().borrow().memory_address()
113    );
114    // Removes entry if it exists for endpoint
115    get_message_bus()
116        .borrow_mut()
117        .endpoints
118        .shift_remove(endpoint);
119}
120
121/// Subscribes the given `handler` to the `topic`.
122pub fn subscribe<T: AsRef<str>>(topic: T, handler: ShareableMessageHandler, priority: Option<u8>) {
123    log::debug!(
124        "Subscribing for topic '{}' at {}",
125        topic.as_ref(),
126        get_message_bus().borrow().memory_address(),
127    );
128
129    let msgbus = get_message_bus();
130    let mut msgbus_ref_mut = msgbus.borrow_mut();
131
132    let sub = Subscription::new(topic.as_ref(), handler, priority);
133    if msgbus_ref_mut.subscriptions.contains_key(&sub) {
134        log::error!("{sub:?} already exists");
135        return;
136    }
137
138    // Find existing patterns which match this topic
139    let mut matches = Vec::new();
140    for (pattern, subs) in msgbus_ref_mut.patterns.iter_mut() {
141        if is_matching(&Ustr::from(topic.as_ref()), pattern) {
142            subs.push(sub.clone());
143            subs.sort();
144            // subs.sort_by(|a, b| a.priority.cmp(&b.priority).then_with(|| a.cmp(b)));
145            matches.push(*pattern);
146        }
147    }
148
149    matches.sort();
150
151    msgbus_ref_mut.subscriptions.insert(sub, matches);
152}
153
154/// Unsubscribes the given `handler` from the `topic`.
155pub fn unsubscribe<T: AsRef<str>>(topic: T, handler: ShareableMessageHandler) {
156    log::debug!(
157        "Unsubscribing for topic '{}' at {}",
158        topic.as_ref(),
159        get_message_bus().borrow().memory_address(),
160    );
161    let sub = Subscription::new(topic, handler, None);
162    get_message_bus()
163        .borrow_mut()
164        .subscriptions
165        .shift_remove(&sub);
166}
167
168pub fn is_subscribed<T: AsRef<str>>(topic: T, handler: ShareableMessageHandler) -> bool {
169    let sub = Subscription::new(topic, handler, None);
170    get_message_bus().borrow().subscriptions.contains_key(&sub)
171}
172
173pub fn subscriptions_count<T: AsRef<str>>(topic: T) -> usize {
174    get_message_bus().borrow().subscriptions_count(topic)
175}
176
177/// Represents a subscription to a particular topic.
178///
179/// This is an internal class intended to be used by the message bus to organize
180/// topics and their subscribers.
181///
182/// # Warnings
183///
184/// Assigning priority handling is an advanced feature which *shouldn't
185/// normally be needed by most users*. **Only assign a higher priority to the
186/// subscription if you are certain of what you're doing**. If an inappropriate
187/// priority is assigned then the handler may receive messages before core
188/// system components have been able to process necessary calculations and
189/// produce potential side effects for logically sound behavior.
190#[derive(Clone)]
191pub struct Subscription {
192    /// The shareable message handler for the subscription.
193    pub handler: ShareableMessageHandler,
194    /// Store a copy of the handler ID for faster equality checks.
195    pub handler_id: Ustr,
196    /// The topic for the subscription.
197    pub topic: Ustr,
198    /// The priority for the subscription determines the ordering of handlers receiving
199    /// messages being processed, higher priority handlers will receive messages before
200    /// lower priority handlers.
201    pub priority: u8,
202}
203
204impl Subscription {
205    /// Creates a new [`Subscription`] instance.
206    #[must_use]
207    pub fn new<T: AsRef<str>>(
208        topic: T,
209        handler: ShareableMessageHandler,
210        priority: Option<u8>,
211    ) -> Self {
212        let handler_id = handler.0.id();
213
214        Self {
215            handler_id,
216            topic: Ustr::from(topic.as_ref()),
217            handler,
218            priority: priority.unwrap_or(0),
219        }
220    }
221}
222
223impl Debug for Subscription {
224    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
225        write!(
226            f,
227            "Subscription {{ topic: {}, handler: {}, priority: {} }}",
228            self.topic, self.handler_id, self.priority
229        )
230    }
231}
232
233impl PartialEq<Self> for Subscription {
234    fn eq(&self, other: &Self) -> bool {
235        self.topic == other.topic && self.handler_id == other.handler_id
236    }
237}
238
239impl Eq for Subscription {}
240
241impl PartialOrd for Subscription {
242    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
243        Some(self.cmp(other))
244    }
245}
246
247impl Ord for Subscription {
248    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
249        other.priority.cmp(&self.priority)
250    }
251}
252
253impl Hash for Subscription {
254    fn hash<H: Hasher>(&self, state: &mut H) {
255        self.topic.hash(state);
256        self.handler_id.hash(state);
257    }
258}
259
260/// A generic message bus to facilitate various messaging patterns.
261///
262/// The bus provides both a producer and consumer API for Pub/Sub, Req/Rep, as
263/// well as direct point-to-point messaging to registered endpoints.
264///
265/// Pub/Sub wildcard patterns for hierarchical topics are possible:
266///  - `*` asterisk represents one or more characters in a pattern.
267///  - `?` question mark represents a single character in a pattern.
268///
269/// Given a topic and pattern potentially containing wildcard characters, i.e.
270/// `*` and `?`, where `?` can match any single character in the topic, and `*`
271/// can match any number of characters including zero characters.
272///
273/// The asterisk in a wildcard matches any character zero or more times. For
274/// example, `comp*` matches anything beginning with `comp` which means `comp`,
275/// `complete`, and `computer` are all matched.
276///
277/// A question mark matches a single character once. For example, `c?mp` matches
278/// `camp` and `comp`. The question mark can also be used more than once.
279/// For example, `c??p` would match both of the above examples and `coop`.
280#[cfg_attr(
281    feature = "python",
282    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
283)]
284pub struct MessageBus {
285    /// The trader ID associated with the message bus.
286    pub trader_id: TraderId,
287    /// The instance ID associated with the message bus.
288    pub instance_id: UUID4,
289    /// The name for the message bus.
290    pub name: String,
291    /// If the message bus is backed by a database.
292    pub has_backing: bool,
293    /// The switchboard for built-in endpoints.
294    pub switchboard: MessagingSwitchboard,
295    /// Mapping from topic to the corresponding handler
296    /// a topic can be a string with wildcards
297    /// * '?' - any character
298    /// * '*' - any number of any characters
299    subscriptions: IndexMap<Subscription, Vec<Ustr>>,
300    /// Maps a pattern to all the handlers registered for it
301    /// this is updated whenever a new subscription is created.
302    patterns: IndexMap<Ustr, Vec<Subscription>>,
303    /// Handles a message or a request destined for a specific endpoint.
304    endpoints: IndexMap<Ustr, ShareableMessageHandler>,
305}
306
307// SAFETY: Message bus is not meant to be passed between threads
308unsafe impl Send for MessageBus {}
309unsafe impl Sync for MessageBus {}
310
311impl MessageBus {
312    /// Creates a new [`MessageBus`] instance.
313    #[must_use]
314    pub fn new(
315        trader_id: TraderId,
316        instance_id: UUID4,
317        name: Option<String>,
318        _config: Option<HashMap<String, serde_json::Value>>,
319    ) -> Self {
320        Self {
321            trader_id,
322            instance_id,
323            name: name.unwrap_or(stringify!(MessageBus).to_owned()),
324            switchboard: MessagingSwitchboard::default(),
325            subscriptions: IndexMap::new(),
326            patterns: IndexMap::new(),
327            endpoints: IndexMap::new(),
328            has_backing: false,
329        }
330    }
331
332    /// Returns the message bus instances memory address.
333    #[must_use]
334    pub fn memory_address(&self) -> String {
335        format!("{:?}", std::ptr::from_ref(self))
336    }
337
338    /// Returns the registered endpoint addresses.
339    #[must_use]
340    pub fn endpoints(&self) -> Vec<&str> {
341        self.endpoints.keys().map(Ustr::as_str).collect()
342    }
343
344    /// Returns the topics for active subscriptions.
345    #[must_use]
346    pub fn topics(&self) -> Vec<&str> {
347        self.subscriptions
348            .keys()
349            .map(|s| s.topic.as_str())
350            .collect()
351    }
352
353    /// Returns whether there are subscribers for the given `pattern`.
354    #[must_use]
355    pub fn has_subscribers<T: AsRef<str>>(&self, pattern: T) -> bool {
356        self.matching_handlers(&Ustr::from(pattern.as_ref()))
357            .next()
358            .is_some()
359    }
360
361    /// Returns the count of subscribers for the given `pattern`.
362    #[must_use]
363    pub fn subscriptions_count<T: AsRef<str>>(&self, pattern: T) -> usize {
364        self.matching_subscriptions(&Ustr::from(pattern.as_ref()))
365            .len()
366    }
367
368    /// Returns whether there are subscribers for the given `pattern`.
369    #[must_use]
370    pub fn subscriptions(&self) -> Vec<&Subscription> {
371        self.subscriptions.keys().collect()
372    }
373
374    /// Returns whether there are subscribers for the given `pattern`.
375    #[must_use]
376    pub fn subscription_handler_ids(&self) -> Vec<&str> {
377        self.subscriptions
378            .keys()
379            .map(|s| s.handler_id.as_str())
380            .collect()
381    }
382
383    /// Returns whether there is a registered endpoint for the given `pattern`.
384    #[must_use]
385    pub fn is_registered<T: AsRef<str>>(&self, endpoint: T) -> bool {
386        self.endpoints.contains_key(&Ustr::from(endpoint.as_ref()))
387    }
388
389    /// Returns whether there are subscribers for the given `pattern`.
390    #[must_use]
391    pub fn is_subscribed<T: AsRef<str>>(&self, topic: T, handler: ShareableMessageHandler) -> bool {
392        let sub = Subscription::new(topic, handler, None);
393        self.subscriptions.contains_key(&sub)
394    }
395
396    /// Close the message bus which will close the sender channel and join the thread.
397    pub const fn close(&self) -> anyhow::Result<()> {
398        // TODO: Integrate the backing database
399        Ok(())
400    }
401    /// Returns the handler for the given `endpoint`.
402    #[must_use]
403    pub fn get_endpoint<T: AsRef<str>>(&self, endpoint: T) -> Option<&ShareableMessageHandler> {
404        self.endpoints.get(&Ustr::from(endpoint.as_ref()))
405    }
406
407    #[must_use]
408    pub fn matching_subscriptions(&self, pattern: &Ustr) -> Vec<Subscription> {
409        let mut matching_subs: Vec<Subscription> = Vec::new();
410
411        // Collect matching subscriptions from direct subscriptions
412        matching_subs.extend(self.subscriptions.iter().filter_map(|(sub, _)| {
413            if is_matching(&sub.topic, pattern) {
414                Some(sub.clone())
415            } else {
416                None
417            }
418        }));
419
420        // Collect matching subscriptions from pattern-based subscriptions
421        // TODO: Improve efficiency of this
422        for subs in self.patterns.values() {
423            let filtered_subs: Vec<Subscription> = subs.to_vec();
424
425            matching_subs.extend(filtered_subs);
426        }
427
428        // Sort into priority order
429        matching_subs.sort();
430        matching_subs
431    }
432
433    fn matching_handlers<'a>(
434        &'a self,
435        pattern: &'a Ustr,
436    ) -> impl Iterator<Item = &'a ShareableMessageHandler> {
437        self.subscriptions.iter().filter_map(move |(sub, _)| {
438            if is_matching(&sub.topic, pattern) {
439                Some(&sub.handler)
440            } else {
441                None
442            }
443        })
444    }
445}
446
447/// Data specific functions.
448impl MessageBus {
449    // /// Send a [`DataRequest`] to an endpoint that must be a data client implementation.
450    // pub fn send_data_request(&self, message: DataRequest) {
451    //     // TODO: log error
452    //     if let Some(client) = self.get_client(&message.client_id, message.venue) {
453    //         let _ = client.request(message);
454    //     }
455    // }
456    //
457    // /// Send a [`SubscriptionCommand`] to an endpoint that must be a data client implementation.
458    // pub fn send_subscription_command(&self, message: SubscriptionCommand) {
459    //     if let Some(client) = self.get_client(&message.client_id, message.venue) {
460    //         client.through_execute(message);
461    //     }
462    // }
463
464    /// Send a [`DataResponse`] to an endpoint that must be an actor.
465    pub fn send_response(&self, message: DataResponse) {
466        if let Some(handler) = self.get_endpoint(message.client_id.inner()) {
467            handler.0.handle(&message);
468        }
469    }
470
471    /// Publish [`Data`] to a topic.
472    pub fn publish_data(&self, topic: &Ustr, message: Data) {
473        let matching_subs = self.matching_subscriptions(topic);
474
475        for sub in matching_subs {
476            sub.handler.0.handle(&message);
477        }
478    }
479
480    /// Register message bus globally
481    pub fn register_message_bus(self) -> Rc<RefCell<MessageBus>> {
482        let msgbus = Rc::new(RefCell::new(self));
483        set_message_bus(msgbus.clone());
484        msgbus
485    }
486}
487
488/// Match a topic and a string pattern
489/// pattern can contains -
490/// '*' - match 0 or more characters after this
491/// '?' - match any character once
492/// 'a-z' - match the specific character
493#[must_use]
494pub fn is_matching(topic: &Ustr, pattern: &Ustr) -> bool {
495    let mut table = [[false; 256]; 256];
496    table[0][0] = true;
497
498    let m = pattern.len();
499    let n = topic.len();
500
501    pattern.chars().enumerate().for_each(|(j, c)| {
502        if c == '*' {
503            table[0][j + 1] = table[0][j];
504        }
505    });
506
507    topic.chars().enumerate().for_each(|(i, tc)| {
508        pattern.chars().enumerate().for_each(|(j, pc)| {
509            if pc == '*' {
510                table[i + 1][j + 1] = table[i][j + 1] || table[i + 1][j];
511            } else if pc == '?' || tc == pc {
512                table[i + 1][j + 1] = table[i][j];
513            }
514        });
515    });
516
517    table[n][m]
518}
519
520impl Default for MessageBus {
521    /// Creates a new default [`MessageBus`] instance.
522    fn default() -> Self {
523        Self::new(TraderId::from("TRADER-001"), UUID4::new(), None, None)
524    }
525}
526
527////////////////////////////////////////////////////////////////////////////////
528// Tests
529////////////////////////////////////////////////////////////////////////////////
530#[cfg(test)]
531pub(crate) mod tests {
532
533    use nautilus_core::UUID4;
534    use rstest::*;
535    use stubs::check_handler_was_called;
536
537    use super::*;
538    use crate::msgbus::stubs::{get_call_check_shareable_handler, get_stub_shareable_handler};
539
540    #[rstest]
541    fn test_new() {
542        let trader_id = TraderId::from("trader-001");
543        let msgbus = MessageBus::new(trader_id, UUID4::new(), None, None);
544
545        assert_eq!(msgbus.trader_id, trader_id);
546        assert_eq!(msgbus.name, stringify!(MessageBus));
547    }
548
549    #[rstest]
550    fn test_endpoints_when_no_endpoints() {
551        let msgbus = get_message_bus();
552        assert!(msgbus.borrow().endpoints().is_empty());
553    }
554
555    #[rstest]
556    fn test_topics_when_no_subscriptions() {
557        let msgbus = get_message_bus();
558        assert!(msgbus.borrow().topics().is_empty());
559        assert!(!msgbus.borrow().has_subscribers("my-topic"));
560    }
561
562    #[rstest]
563    fn test_is_subscribed_when_no_subscriptions() {
564        let msgbus = get_message_bus();
565        let handler = get_stub_shareable_handler(None);
566
567        assert!(!msgbus.borrow().is_subscribed("my-topic", handler));
568    }
569
570    #[rstest]
571    fn test_is_registered_when_no_registrations() {
572        let msgbus = get_message_bus();
573        assert!(!msgbus.borrow().is_registered("MyEndpoint"));
574    }
575
576    #[rstest]
577    fn test_regsiter_endpoint() {
578        let msgbus = get_message_bus();
579        let endpoint = "MyEndpoint";
580        let handler = get_stub_shareable_handler(None);
581
582        register(endpoint, handler);
583
584        assert_eq!(msgbus.borrow().endpoints(), vec![endpoint.to_string()]);
585        assert!(msgbus.borrow().get_endpoint(endpoint).is_some());
586    }
587
588    #[rstest]
589    fn test_endpoint_send() {
590        let msgbus = get_message_bus();
591        let endpoint = Ustr::from("MyEndpoint");
592        let handler = get_call_check_shareable_handler(None);
593
594        register(endpoint, handler.clone());
595        assert!(msgbus.borrow().get_endpoint(endpoint).is_some());
596        assert!(!check_handler_was_called(handler.clone()));
597
598        // Send a message to the endpoint
599        send(&endpoint, &"Test Message");
600        assert!(check_handler_was_called(handler));
601    }
602
603    #[rstest]
604    fn test_deregsiter_endpoint() {
605        let msgbus = get_message_bus();
606        let endpoint = Ustr::from("MyEndpoint");
607        let handler = get_stub_shareable_handler(None);
608
609        register(endpoint, handler);
610        deregister(&endpoint);
611
612        assert!(msgbus.borrow().endpoints().is_empty());
613    }
614
615    #[rstest]
616    fn test_subscribe() {
617        let msgbus = get_message_bus();
618        let topic = "my-topic";
619        let handler = get_stub_shareable_handler(None);
620
621        subscribe(topic, handler, Some(1));
622
623        assert!(msgbus.borrow().has_subscribers(topic));
624        assert_eq!(msgbus.borrow().topics(), vec![topic]);
625    }
626
627    #[rstest]
628    fn test_unsubscribe() {
629        let msgbus = get_message_bus();
630        let topic = "my-topic";
631        let handler = get_stub_shareable_handler(None);
632
633        subscribe(topic, handler.clone(), None);
634        unsubscribe(topic, handler);
635
636        assert!(!msgbus.borrow().has_subscribers(topic));
637        assert!(msgbus.borrow().topics().is_empty());
638    }
639
640    #[rstest]
641    fn test_matching_subscriptions() {
642        let msgbus = get_message_bus();
643        let topic = "my-topic";
644
645        let handler_id1 = Ustr::from("1");
646        let handler1 = get_stub_shareable_handler(Some(handler_id1));
647
648        let handler_id2 = Ustr::from("2");
649        let handler2 = get_stub_shareable_handler(Some(handler_id2));
650
651        let handler_id3 = Ustr::from("3");
652        let handler3 = get_stub_shareable_handler(Some(handler_id3));
653
654        let handler_id4 = Ustr::from("4");
655        let handler4 = get_stub_shareable_handler(Some(handler_id4));
656
657        subscribe(topic, handler1, None);
658        subscribe(topic, handler2, None);
659        subscribe(topic, handler3, Some(1));
660        subscribe(topic, handler4, Some(2));
661        let topic = Ustr::from(topic);
662
663        let subs = msgbus.borrow().matching_subscriptions(&topic);
664        assert_eq!(subs.len(), 4);
665        assert_eq!(subs[0].handler_id, handler_id4);
666        assert_eq!(subs[1].handler_id, handler_id3);
667        assert_eq!(subs[2].handler_id, handler_id1);
668        assert_eq!(subs[3].handler_id, handler_id2);
669    }
670
671    #[rstest]
672    #[case("*", "*", true)]
673    #[case("a", "*", true)]
674    #[case("a", "a", true)]
675    #[case("a", "b", false)]
676    #[case("data.quotes.BINANCE", "data.*", true)]
677    #[case("data.quotes.BINANCE", "data.quotes*", true)]
678    #[case("data.quotes.BINANCE", "data.*.BINANCE", true)]
679    #[case("data.trades.BINANCE.ETHUSDT", "data.*.BINANCE.*", true)]
680    #[case("data.trades.BINANCE.ETHUSDT", "data.*.BINANCE.ETH*", true)]
681    #[case("data.trades.BINANCE.ETHUSDT", "data.*.BINANCE.ETH???", false)]
682    #[case("data.trades.BINANCE.ETHUSD", "data.*.BINANCE.ETH???", true)]
683    // We don't support [seq] style pattern
684    #[case("data.trades.BINANCE.ETHUSDT", "data.*.BINANCE.ET[HC]USDT", false)]
685    // We don't support [!seq] style pattern
686    #[case("data.trades.BINANCE.ETHUSDT", "data.*.BINANCE.ET[!ABC]USDT", false)]
687    // We don't support [^seq] style pattern
688    #[case("data.trades.BINANCE.ETHUSDT", "data.*.BINANCE.ET[^ABC]USDT", false)]
689    fn test_is_matching(#[case] topic: &str, #[case] pattern: &str, #[case] expected: bool) {
690        assert_eq!(
691            is_matching(&Ustr::from(topic), &Ustr::from(pattern)),
692            expected
693        );
694    }
695}