Skip to main content

nautilus_common/msgbus/
api.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Public API functions for interacting with the message bus.
17//!
18//! This module provides free-standing functions that wrap the thread-local
19//! message bus, providing a convenient API for:
20//!
21//! - Registering endpoint handlers (point-to-point messaging).
22//! - Subscribing to topics (pub/sub messaging).
23//! - Publishing messages to subscribers.
24//! - Sending messages to endpoints.
25
26use std::{any::Any, cell::RefCell, rc::Rc, thread::LocalKey};
27
28use nautilus_core::UUID4;
29#[cfg(feature = "defi")]
30use nautilus_model::defi::{
31    Block, DefiData, Pool, PoolFeeCollect, PoolFlash, PoolLiquidityUpdate, PoolSwap,
32};
33use nautilus_model::{
34    data::{
35        Bar, Data, FundingRateUpdate, GreeksData, IndexPriceUpdate, MarkPriceUpdate,
36        OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick,
37    },
38    events::{AccountState, OrderEventAny, PositionEvent},
39    orderbook::OrderBook,
40    orders::OrderAny,
41    position::Position,
42};
43use smallvec::SmallVec;
44use ustr::Ustr;
45
46use super::{
47    ACCOUNT_STATE_HANDLERS, ANY_HANDLERS, BAR_HANDLERS, BOOK_HANDLERS, DELTAS_HANDLERS,
48    DEPTH10_HANDLERS, FUNDING_RATE_HANDLERS, GREEKS_HANDLERS, HANDLER_BUFFER_CAP,
49    INDEX_PRICE_HANDLERS, MARK_PRICE_HANDLERS, MESSAGE_BUS, ORDER_EVENT_HANDLERS,
50    POSITION_EVENT_HANDLERS, QUOTE_HANDLERS, TRADE_HANDLERS,
51    core::{MessageBus, Subscription},
52    get_message_bus,
53    matching::is_matching_backtracking,
54    mstr::{Endpoint, MStr, Pattern, Topic},
55    typed_handler::{ShareableMessageHandler, TypedHandler, TypedIntoHandler},
56};
57#[cfg(feature = "defi")]
58use super::{
59    DEFI_BLOCK_HANDLERS, DEFI_COLLECT_HANDLERS, DEFI_FLASH_HANDLERS, DEFI_LIQUIDITY_HANDLERS,
60    DEFI_POOL_HANDLERS, DEFI_SWAP_HANDLERS,
61};
62use crate::messages::{
63    data::{DataCommand, DataResponse},
64    execution::{ExecutionReport, TradingCommand},
65};
66
67/// Registers a handler for an endpoint using runtime type dispatch (Any).
68pub fn register_any(endpoint: MStr<Endpoint>, handler: ShareableMessageHandler) {
69    log::debug!(
70        "Registering endpoint '{endpoint}' with handler ID {}",
71        handler.0.id(),
72    );
73    get_message_bus()
74        .borrow_mut()
75        .endpoints
76        .insert(endpoint, handler);
77}
78
79/// Registers a response handler for a correlation ID.
80pub fn register_response_handler(correlation_id: &UUID4, handler: ShareableMessageHandler) {
81    if let Err(e) = get_message_bus()
82        .borrow_mut()
83        .register_response_handler(correlation_id, handler)
84    {
85        log::error!("Failed to register request handler: {e}");
86    }
87}
88
89/// Registers a quote tick handler at an endpoint.
90pub fn register_quote_endpoint(endpoint: MStr<Endpoint>, handler: TypedHandler<QuoteTick>) {
91    get_message_bus()
92        .borrow_mut()
93        .endpoints_quotes
94        .register(endpoint, handler);
95}
96
97/// Registers a trade tick handler at an endpoint.
98pub fn register_trade_endpoint(endpoint: MStr<Endpoint>, handler: TypedHandler<TradeTick>) {
99    get_message_bus()
100        .borrow_mut()
101        .endpoints_trades
102        .register(endpoint, handler);
103}
104
105/// Registers a bar handler at an endpoint.
106pub fn register_bar_endpoint(endpoint: MStr<Endpoint>, handler: TypedHandler<Bar>) {
107    get_message_bus()
108        .borrow_mut()
109        .endpoints_bars
110        .register(endpoint, handler);
111}
112
113/// Registers an order event handler at an endpoint (ownership-based).
114pub fn register_order_event_endpoint(
115    endpoint: MStr<Endpoint>,
116    handler: TypedIntoHandler<OrderEventAny>,
117) {
118    get_message_bus()
119        .borrow_mut()
120        .endpoints_order_events
121        .register(endpoint, handler);
122}
123
124/// Registers an account state handler at an endpoint.
125pub fn register_account_state_endpoint(
126    endpoint: MStr<Endpoint>,
127    handler: TypedHandler<AccountState>,
128) {
129    get_message_bus()
130        .borrow_mut()
131        .endpoints_account_state
132        .register(endpoint, handler);
133}
134
135/// Registers a trading command handler at an endpoint (ownership-based).
136pub fn register_trading_command_endpoint(
137    endpoint: MStr<Endpoint>,
138    handler: TypedIntoHandler<TradingCommand>,
139) {
140    get_message_bus()
141        .borrow_mut()
142        .endpoints_trading_commands
143        .register(endpoint, handler);
144}
145
146/// Registers a data command handler at an endpoint (ownership-based).
147pub fn register_data_command_endpoint(
148    endpoint: MStr<Endpoint>,
149    handler: TypedIntoHandler<DataCommand>,
150) {
151    get_message_bus()
152        .borrow_mut()
153        .endpoints_data_commands
154        .register(endpoint, handler);
155}
156
157/// Registers a data response handler at an endpoint (ownership-based).
158pub fn register_data_response_endpoint(
159    endpoint: MStr<Endpoint>,
160    handler: TypedIntoHandler<DataResponse>,
161) {
162    get_message_bus()
163        .borrow_mut()
164        .endpoints_data_responses
165        .register(endpoint, handler);
166}
167
168/// Registers an execution report handler at an endpoint (ownership-based).
169pub fn register_execution_report_endpoint(
170    endpoint: MStr<Endpoint>,
171    handler: TypedIntoHandler<ExecutionReport>,
172) {
173    get_message_bus()
174        .borrow_mut()
175        .endpoints_exec_reports
176        .register(endpoint, handler);
177}
178
179/// Registers a data handler at an endpoint (ownership-based).
180pub fn register_data_endpoint(endpoint: MStr<Endpoint>, handler: TypedIntoHandler<Data>) {
181    get_message_bus()
182        .borrow_mut()
183        .endpoints_data
184        .register(endpoint, handler);
185}
186
187/// Registers a DeFi data handler at an endpoint (ownership-based).
188#[cfg(feature = "defi")]
189pub fn register_defi_data_endpoint(endpoint: MStr<Endpoint>, handler: TypedIntoHandler<DefiData>) {
190    get_message_bus()
191        .borrow_mut()
192        .endpoints_defi_data
193        .register(endpoint, handler);
194}
195
196/// Deregisters the handler for an endpoint (Any-based).
197pub fn deregister_any(endpoint: MStr<Endpoint>) {
198    log::debug!("Deregistering endpoint '{endpoint}'");
199    get_message_bus()
200        .borrow_mut()
201        .endpoints
202        .shift_remove(&endpoint);
203}
204
205/// Subscribes a handler to a pattern using runtime type dispatch (Any).
206///
207/// # Warnings
208///
209/// Assigning priority handling is an advanced feature which *shouldn't
210/// normally be needed by most users*. **Only assign a higher priority to the
211/// subscription if you are certain of what you're doing**. If an inappropriate
212/// priority is assigned then the handler may receive messages before core
213/// system components have been able to process necessary calculations and
214/// produce potential side effects for logically sound behavior.
215pub fn subscribe_any(
216    pattern: MStr<Pattern>,
217    handler: ShareableMessageHandler,
218    priority: Option<u8>,
219) {
220    let msgbus = get_message_bus();
221    let mut msgbus_ref_mut = msgbus.borrow_mut();
222    let sub = Subscription::new(pattern, handler, priority);
223
224    log::debug!(
225        "Subscribing {:?} for pattern '{}'",
226        sub.handler,
227        sub.pattern
228    );
229
230    if msgbus_ref_mut.subscriptions.contains(&sub) {
231        log::warn!("{sub:?} already exists");
232        return;
233    }
234
235    for (topic, subs) in &mut msgbus_ref_mut.topics {
236        if is_matching_backtracking(*topic, sub.pattern) {
237            subs.push(sub.clone());
238            subs.sort();
239            log::debug!("Added subscription for '{topic}'");
240        }
241    }
242
243    msgbus_ref_mut.subscriptions.insert(sub);
244}
245
246/// Subscribes a handler to instrument messages matching a pattern.
247pub fn subscribe_instruments(
248    pattern: MStr<Pattern>,
249    handler: ShareableMessageHandler,
250    priority: Option<u8>,
251) {
252    subscribe_any(pattern, handler, priority);
253}
254
255/// Subscribes a handler to instrument close messages matching a pattern.
256pub fn subscribe_instrument_close(
257    pattern: MStr<Pattern>,
258    handler: ShareableMessageHandler,
259    priority: Option<u8>,
260) {
261    subscribe_any(pattern, handler, priority);
262}
263
264/// Subscribes a handler to order book deltas matching a pattern.
265pub fn subscribe_book_deltas(
266    pattern: MStr<Pattern>,
267    handler: TypedHandler<OrderBookDeltas>,
268    priority: Option<u8>,
269) {
270    get_message_bus()
271        .borrow_mut()
272        .router_deltas
273        .subscribe(pattern, handler, priority.unwrap_or(0));
274}
275
276/// Subscribes a handler to order book depth10 snapshots matching a pattern.
277pub fn subscribe_book_depth10(
278    pattern: MStr<Pattern>,
279    handler: TypedHandler<OrderBookDepth10>,
280    priority: Option<u8>,
281) {
282    get_message_bus().borrow_mut().router_depth10.subscribe(
283        pattern,
284        handler,
285        priority.unwrap_or(0),
286    );
287}
288
289/// Subscribes a handler to order book snapshots matching a pattern.
290pub fn subscribe_book_snapshots(
291    pattern: MStr<Pattern>,
292    handler: TypedHandler<OrderBook>,
293    priority: Option<u8>,
294) {
295    get_message_bus()
296        .borrow_mut()
297        .router_book_snapshots
298        .subscribe(pattern, handler, priority.unwrap_or(0));
299}
300
301/// Subscribes a handler to quote ticks matching a pattern.
302pub fn subscribe_quotes(
303    pattern: MStr<Pattern>,
304    handler: TypedHandler<QuoteTick>,
305    priority: Option<u8>,
306) {
307    get_message_bus()
308        .borrow_mut()
309        .router_quotes
310        .subscribe(pattern, handler, priority.unwrap_or(0));
311}
312
313/// Subscribes a handler to trade ticks matching a pattern.
314pub fn subscribe_trades(
315    pattern: MStr<Pattern>,
316    handler: TypedHandler<TradeTick>,
317    priority: Option<u8>,
318) {
319    get_message_bus()
320        .borrow_mut()
321        .router_trades
322        .subscribe(pattern, handler, priority.unwrap_or(0));
323}
324
325/// Subscribes a handler to bars matching a pattern.
326pub fn subscribe_bars(pattern: MStr<Pattern>, handler: TypedHandler<Bar>, priority: Option<u8>) {
327    get_message_bus()
328        .borrow_mut()
329        .router_bars
330        .subscribe(pattern, handler, priority.unwrap_or(0));
331}
332
333/// Subscribes a handler to mark price updates matching a pattern.
334pub fn subscribe_mark_prices(
335    pattern: MStr<Pattern>,
336    handler: TypedHandler<MarkPriceUpdate>,
337    priority: Option<u8>,
338) {
339    get_message_bus().borrow_mut().router_mark_prices.subscribe(
340        pattern,
341        handler,
342        priority.unwrap_or(0),
343    );
344}
345
346/// Subscribes a handler to index price updates matching a pattern.
347pub fn subscribe_index_prices(
348    pattern: MStr<Pattern>,
349    handler: TypedHandler<IndexPriceUpdate>,
350    priority: Option<u8>,
351) {
352    get_message_bus()
353        .borrow_mut()
354        .router_index_prices
355        .subscribe(pattern, handler, priority.unwrap_or(0));
356}
357
358/// Subscribes a handler to funding rate updates matching a pattern.
359pub fn subscribe_funding_rates(
360    pattern: MStr<Pattern>,
361    handler: TypedHandler<FundingRateUpdate>,
362    priority: Option<u8>,
363) {
364    get_message_bus()
365        .borrow_mut()
366        .router_funding_rates
367        .subscribe(pattern, handler, priority.unwrap_or(0));
368}
369
370/// Subscribes a handler to greeks data matching a pattern.
371pub fn subscribe_greeks(
372    pattern: MStr<Pattern>,
373    handler: TypedHandler<GreeksData>,
374    priority: Option<u8>,
375) {
376    get_message_bus()
377        .borrow_mut()
378        .router_greeks
379        .subscribe(pattern, handler, priority.unwrap_or(0));
380}
381
382/// Subscribes a handler to order events matching a pattern.
383pub fn subscribe_order_events(
384    pattern: MStr<Pattern>,
385    handler: TypedHandler<OrderEventAny>,
386    priority: Option<u8>,
387) {
388    get_message_bus()
389        .borrow_mut()
390        .router_order_events
391        .subscribe(pattern, handler, priority.unwrap_or(0));
392}
393
394/// Subscribes a handler to position events matching a pattern.
395pub fn subscribe_position_events(
396    pattern: MStr<Pattern>,
397    handler: TypedHandler<PositionEvent>,
398    priority: Option<u8>,
399) {
400    get_message_bus()
401        .borrow_mut()
402        .router_position_events
403        .subscribe(pattern, handler, priority.unwrap_or(0));
404}
405
406/// Subscribes a handler to account state updates matching a pattern.
407pub fn subscribe_account_state(
408    pattern: MStr<Pattern>,
409    handler: TypedHandler<AccountState>,
410    priority: Option<u8>,
411) {
412    get_message_bus()
413        .borrow_mut()
414        .router_account_state
415        .subscribe(pattern, handler, priority.unwrap_or(0));
416}
417
418/// Subscribes a handler to positions matching a pattern.
419pub fn subscribe_positions(
420    pattern: MStr<Pattern>,
421    handler: TypedHandler<Position>,
422    priority: Option<u8>,
423) {
424    get_message_bus().borrow_mut().router_positions.subscribe(
425        pattern,
426        handler,
427        priority.unwrap_or(0),
428    );
429}
430
431/// Subscribes a handler to DeFi blocks matching a pattern.
432#[cfg(feature = "defi")]
433pub fn subscribe_defi_blocks(
434    pattern: MStr<Pattern>,
435    handler: TypedHandler<Block>,
436    priority: Option<u8>,
437) {
438    get_message_bus().borrow_mut().router_defi_blocks.subscribe(
439        pattern,
440        handler,
441        priority.unwrap_or(0),
442    );
443}
444
445/// Subscribes a handler to DeFi pools matching a pattern.
446#[cfg(feature = "defi")]
447pub fn subscribe_defi_pools(
448    pattern: MStr<Pattern>,
449    handler: TypedHandler<Pool>,
450    priority: Option<u8>,
451) {
452    get_message_bus().borrow_mut().router_defi_pools.subscribe(
453        pattern,
454        handler,
455        priority.unwrap_or(0),
456    );
457}
458
459/// Subscribes a handler to DeFi pool swaps matching a pattern.
460#[cfg(feature = "defi")]
461pub fn subscribe_defi_swaps(
462    pattern: MStr<Pattern>,
463    handler: TypedHandler<PoolSwap>,
464    priority: Option<u8>,
465) {
466    get_message_bus().borrow_mut().router_defi_swaps.subscribe(
467        pattern,
468        handler,
469        priority.unwrap_or(0),
470    );
471}
472
473/// Subscribes a handler to DeFi liquidity updates matching a pattern.
474#[cfg(feature = "defi")]
475pub fn subscribe_defi_liquidity(
476    pattern: MStr<Pattern>,
477    handler: TypedHandler<PoolLiquidityUpdate>,
478    priority: Option<u8>,
479) {
480    get_message_bus()
481        .borrow_mut()
482        .router_defi_liquidity
483        .subscribe(pattern, handler, priority.unwrap_or(0));
484}
485
486/// Subscribes a handler to DeFi fee collects matching a pattern.
487#[cfg(feature = "defi")]
488pub fn subscribe_defi_collects(
489    pattern: MStr<Pattern>,
490    handler: TypedHandler<PoolFeeCollect>,
491    priority: Option<u8>,
492) {
493    get_message_bus()
494        .borrow_mut()
495        .router_defi_collects
496        .subscribe(pattern, handler, priority.unwrap_or(0));
497}
498
499/// Subscribes a handler to DeFi flash loans matching a pattern.
500#[cfg(feature = "defi")]
501pub fn subscribe_defi_flash(
502    pattern: MStr<Pattern>,
503    handler: TypedHandler<PoolFlash>,
504    priority: Option<u8>,
505) {
506    get_message_bus().borrow_mut().router_defi_flash.subscribe(
507        pattern,
508        handler,
509        priority.unwrap_or(0),
510    );
511}
512
513/// Unsubscribes a handler from instrument messages.
514pub fn unsubscribe_instruments(pattern: MStr<Pattern>, handler: ShareableMessageHandler) {
515    unsubscribe_any(pattern, handler);
516}
517
518/// Unsubscribes a handler from instrument close messages.
519pub fn unsubscribe_instrument_close(pattern: MStr<Pattern>, handler: ShareableMessageHandler) {
520    unsubscribe_any(pattern, handler);
521}
522
523/// Unsubscribes a handler from order book deltas.
524pub fn unsubscribe_book_deltas(pattern: MStr<Pattern>, handler: &TypedHandler<OrderBookDeltas>) {
525    get_message_bus()
526        .borrow_mut()
527        .router_deltas
528        .unsubscribe(pattern, handler);
529}
530
531/// Unsubscribes a handler from order book depth10 snapshots.
532pub fn unsubscribe_book_depth10(pattern: MStr<Pattern>, handler: &TypedHandler<OrderBookDepth10>) {
533    get_message_bus()
534        .borrow_mut()
535        .router_depth10
536        .unsubscribe(pattern, handler);
537}
538
539/// Unsubscribes a handler from order book snapshots.
540pub fn unsubscribe_book_snapshots(pattern: MStr<Pattern>, handler: &TypedHandler<OrderBook>) {
541    get_message_bus()
542        .borrow_mut()
543        .router_book_snapshots
544        .unsubscribe(pattern, handler);
545}
546
547/// Unsubscribes a handler from quote ticks.
548pub fn unsubscribe_quotes(pattern: MStr<Pattern>, handler: &TypedHandler<QuoteTick>) {
549    get_message_bus()
550        .borrow_mut()
551        .router_quotes
552        .unsubscribe(pattern, handler);
553}
554
555/// Unsubscribes a handler from trade ticks.
556pub fn unsubscribe_trades(pattern: MStr<Pattern>, handler: &TypedHandler<TradeTick>) {
557    get_message_bus()
558        .borrow_mut()
559        .router_trades
560        .unsubscribe(pattern, handler);
561}
562
563/// Unsubscribes a handler from bars.
564pub fn unsubscribe_bars(pattern: MStr<Pattern>, handler: &TypedHandler<Bar>) {
565    get_message_bus()
566        .borrow_mut()
567        .router_bars
568        .unsubscribe(pattern, handler);
569}
570
571/// Unsubscribes a handler from mark price updates.
572pub fn unsubscribe_mark_prices(pattern: MStr<Pattern>, handler: &TypedHandler<MarkPriceUpdate>) {
573    get_message_bus()
574        .borrow_mut()
575        .router_mark_prices
576        .unsubscribe(pattern, handler);
577}
578
579/// Unsubscribes a handler from index price updates.
580pub fn unsubscribe_index_prices(pattern: MStr<Pattern>, handler: &TypedHandler<IndexPriceUpdate>) {
581    get_message_bus()
582        .borrow_mut()
583        .router_index_prices
584        .unsubscribe(pattern, handler);
585}
586
587/// Unsubscribes a handler from funding rate updates.
588pub fn unsubscribe_funding_rates(
589    pattern: MStr<Pattern>,
590    handler: &TypedHandler<FundingRateUpdate>,
591) {
592    get_message_bus()
593        .borrow_mut()
594        .router_funding_rates
595        .unsubscribe(pattern, handler);
596}
597
598/// Unsubscribes a handler from account state updates.
599pub fn unsubscribe_account_state(pattern: MStr<Pattern>, handler: &TypedHandler<AccountState>) {
600    get_message_bus()
601        .borrow_mut()
602        .router_account_state
603        .unsubscribe(pattern, handler);
604}
605
606/// Unsubscribes a handler from order events.
607pub fn unsubscribe_order_events(pattern: MStr<Pattern>, handler: &TypedHandler<OrderEventAny>) {
608    get_message_bus()
609        .borrow_mut()
610        .router_order_events
611        .unsubscribe(pattern, handler);
612}
613
614/// Unsubscribes a handler from position events.
615pub fn unsubscribe_position_events(pattern: MStr<Pattern>, handler: &TypedHandler<PositionEvent>) {
616    get_message_bus()
617        .borrow_mut()
618        .router_position_events
619        .unsubscribe(pattern, handler);
620}
621
622/// Removes a specific order event handler by pattern and handler ID.
623pub fn remove_order_event_handler(pattern: MStr<Pattern>, handler_id: Ustr) {
624    get_message_bus()
625        .borrow_mut()
626        .router_order_events
627        .remove_handler(pattern, handler_id);
628}
629
630/// Removes a specific position event handler by pattern and handler ID.
631pub fn remove_position_event_handler(pattern: MStr<Pattern>, handler_id: Ustr) {
632    get_message_bus()
633        .borrow_mut()
634        .router_position_events
635        .remove_handler(pattern, handler_id);
636}
637
638/// Unsubscribes a handler from orders.
639pub fn unsubscribe_orders(pattern: MStr<Pattern>, handler: &TypedHandler<OrderAny>) {
640    get_message_bus()
641        .borrow_mut()
642        .router_orders
643        .unsubscribe(pattern, handler);
644}
645
646/// Unsubscribes a handler from positions.
647pub fn unsubscribe_positions(pattern: MStr<Pattern>, handler: &TypedHandler<Position>) {
648    get_message_bus()
649        .borrow_mut()
650        .router_positions
651        .unsubscribe(pattern, handler);
652}
653
654/// Unsubscribes a handler from greeks data.
655pub fn unsubscribe_greeks(pattern: MStr<Pattern>, handler: &TypedHandler<GreeksData>) {
656    get_message_bus()
657        .borrow_mut()
658        .router_greeks
659        .unsubscribe(pattern, handler);
660}
661
662/// Unsubscribes a handler from DeFi blocks.
663#[cfg(feature = "defi")]
664pub fn unsubscribe_defi_blocks(pattern: MStr<Pattern>, handler: &TypedHandler<Block>) {
665    get_message_bus()
666        .borrow_mut()
667        .router_defi_blocks
668        .unsubscribe(pattern, handler);
669}
670
671/// Unsubscribes a handler from DeFi pools.
672#[cfg(feature = "defi")]
673pub fn unsubscribe_defi_pools(pattern: MStr<Pattern>, handler: &TypedHandler<Pool>) {
674    get_message_bus()
675        .borrow_mut()
676        .router_defi_pools
677        .unsubscribe(pattern, handler);
678}
679
680/// Unsubscribes a handler from DeFi pool swaps.
681#[cfg(feature = "defi")]
682pub fn unsubscribe_defi_swaps(pattern: MStr<Pattern>, handler: &TypedHandler<PoolSwap>) {
683    get_message_bus()
684        .borrow_mut()
685        .router_defi_swaps
686        .unsubscribe(pattern, handler);
687}
688
689/// Unsubscribes a handler from DeFi liquidity updates.
690#[cfg(feature = "defi")]
691pub fn unsubscribe_defi_liquidity(
692    pattern: MStr<Pattern>,
693    handler: &TypedHandler<PoolLiquidityUpdate>,
694) {
695    get_message_bus()
696        .borrow_mut()
697        .router_defi_liquidity
698        .unsubscribe(pattern, handler);
699}
700
701/// Unsubscribes a handler from DeFi fee collects.
702#[cfg(feature = "defi")]
703pub fn unsubscribe_defi_collects(pattern: MStr<Pattern>, handler: &TypedHandler<PoolFeeCollect>) {
704    get_message_bus()
705        .borrow_mut()
706        .router_defi_collects
707        .unsubscribe(pattern, handler);
708}
709
710/// Unsubscribes a handler from DeFi flash loans.
711#[cfg(feature = "defi")]
712pub fn unsubscribe_defi_flash(pattern: MStr<Pattern>, handler: &TypedHandler<PoolFlash>) {
713    get_message_bus()
714        .borrow_mut()
715        .router_defi_flash
716        .unsubscribe(pattern, handler);
717}
718
719/// Unsubscribes a handler from a pattern (Any-based).
720pub fn unsubscribe_any(pattern: MStr<Pattern>, handler: ShareableMessageHandler) {
721    log::debug!("Unsubscribing {handler:?} from pattern '{pattern}'");
722
723    let handler_id = handler.0.id();
724    let bus_rc = get_message_bus();
725    let mut bus = bus_rc.borrow_mut();
726
727    let count_before = bus.subscriptions.len();
728
729    bus.topics.values_mut().for_each(|subs| {
730        subs.retain(|s| !(s.pattern == pattern && s.handler_id == handler_id));
731    });
732
733    bus.subscriptions
734        .retain(|s| !(s.pattern == pattern && s.handler_id == handler_id));
735
736    let removed = bus.subscriptions.len() < count_before;
737
738    if removed {
739        log::debug!("Handler for pattern '{pattern}' was removed");
740    } else {
741        log::debug!("No matching handler for pattern '{pattern}' was found");
742    }
743}
744
745/// Checks if a handler is subscribed to a pattern (Any-based).
746pub fn is_subscribed_any<T: AsRef<str>>(pattern: T, handler: ShareableMessageHandler) -> bool {
747    let pattern = MStr::from(pattern.as_ref());
748    let sub = Subscription::new(pattern, handler, None);
749    get_message_bus().borrow().subscriptions.contains(&sub)
750}
751
752/// Returns the count of Any-based subscriptions for a topic.
753pub fn subscriptions_count_any<S: AsRef<str>>(topic: S) -> usize {
754    get_message_bus().borrow().subscriptions_count(topic)
755}
756
757/// Returns the subscriber count for order book deltas on a topic.
758pub fn subscriber_count_deltas(topic: MStr<Topic>) -> usize {
759    get_message_bus()
760        .borrow()
761        .router_deltas
762        .subscriber_count(topic)
763}
764
765/// Returns the subscriber count for order book depth10 on a topic.
766pub fn subscriber_count_depth10(topic: MStr<Topic>) -> usize {
767    get_message_bus()
768        .borrow()
769        .router_depth10
770        .subscriber_count(topic)
771}
772
773/// Returns the subscriber count for order book snapshots on a topic.
774pub fn subscriber_count_book_snapshots(topic: MStr<Topic>) -> usize {
775    get_message_bus()
776        .borrow()
777        .router_book_snapshots
778        .subscriber_count(topic)
779}
780
781/// Returns the exact subscriber count for bars on a topic,
782/// excluding wildcard pattern subscriptions.
783pub fn exact_subscriber_count_bars(topic: MStr<Topic>) -> usize {
784    get_message_bus()
785        .borrow()
786        .router_bars
787        .exact_subscriber_count(topic)
788}
789
790/// Publishes a message to the topic using runtime type dispatch (Any).
791pub fn publish_any(topic: MStr<Topic>, message: &dyn Any) {
792    // SAFETY: Take buffer (re-entrancy safe)
793    let mut handlers = ANY_HANDLERS.with_borrow_mut(std::mem::take);
794
795    get_message_bus()
796        .borrow_mut()
797        .fill_matching_any_handlers(topic, &mut handlers);
798
799    for handler in &handlers {
800        handler.0.handle(message);
801    }
802
803    handlers.clear(); // Release refs before restore
804    ANY_HANDLERS.with_borrow_mut(|buf| *buf = handlers);
805}
806
807/// Publishes order book deltas to subscribers on a topic.
808pub fn publish_deltas(topic: MStr<Topic>, deltas: &OrderBookDeltas) {
809    publish_typed(
810        &DELTAS_HANDLERS,
811        |bus, h| bus.router_deltas.fill_matching_handlers(topic, h),
812        deltas,
813    );
814}
815
816/// Publishes order book depth10 to subscribers on a topic.
817pub fn publish_depth10(topic: MStr<Topic>, depth: &OrderBookDepth10) {
818    publish_typed(
819        &DEPTH10_HANDLERS,
820        |bus, h| bus.router_depth10.fill_matching_handlers(topic, h),
821        depth,
822    );
823}
824
825/// Publishes an order book snapshot to subscribers on a topic.
826pub fn publish_book(topic: MStr<Topic>, book: &OrderBook) {
827    publish_typed(
828        &BOOK_HANDLERS,
829        |bus, h| bus.router_book_snapshots.fill_matching_handlers(topic, h),
830        book,
831    );
832}
833
834/// Publishes a quote tick to subscribers on a topic.
835pub fn publish_quote(topic: MStr<Topic>, quote: &QuoteTick) {
836    publish_typed(
837        &QUOTE_HANDLERS,
838        |bus, h| bus.router_quotes.fill_matching_handlers(topic, h),
839        quote,
840    );
841}
842
843/// Publishes a trade tick to subscribers on a topic.
844pub fn publish_trade(topic: MStr<Topic>, trade: &TradeTick) {
845    publish_typed(
846        &TRADE_HANDLERS,
847        |bus, h| bus.router_trades.fill_matching_handlers(topic, h),
848        trade,
849    );
850}
851
852/// Publishes a bar to subscribers on a topic.
853pub fn publish_bar(topic: MStr<Topic>, bar: &Bar) {
854    publish_typed(
855        &BAR_HANDLERS,
856        |bus, h| bus.router_bars.fill_matching_handlers(topic, h),
857        bar,
858    );
859}
860
861/// Publishes a mark price update to subscribers on a topic.
862pub fn publish_mark_price(topic: MStr<Topic>, mark_price: &MarkPriceUpdate) {
863    publish_typed(
864        &MARK_PRICE_HANDLERS,
865        |bus, h| bus.router_mark_prices.fill_matching_handlers(topic, h),
866        mark_price,
867    );
868}
869
870/// Publishes an index price update to subscribers on a topic.
871pub fn publish_index_price(topic: MStr<Topic>, index_price: &IndexPriceUpdate) {
872    publish_typed(
873        &INDEX_PRICE_HANDLERS,
874        |bus, h| bus.router_index_prices.fill_matching_handlers(topic, h),
875        index_price,
876    );
877}
878
879/// Publishes a funding rate update to subscribers on a topic.
880pub fn publish_funding_rate(topic: MStr<Topic>, funding_rate: &FundingRateUpdate) {
881    publish_typed(
882        &FUNDING_RATE_HANDLERS,
883        |bus, h| bus.router_funding_rates.fill_matching_handlers(topic, h),
884        funding_rate,
885    );
886}
887
888/// Publishes greeks data to subscribers on a topic.
889pub fn publish_greeks(topic: MStr<Topic>, greeks: &GreeksData) {
890    publish_typed(
891        &GREEKS_HANDLERS,
892        |bus, h| bus.router_greeks.fill_matching_handlers(topic, h),
893        greeks,
894    );
895}
896
897/// Publishes an account state to subscribers on a topic.
898pub fn publish_account_state(topic: MStr<Topic>, state: &AccountState) {
899    publish_typed(
900        &ACCOUNT_STATE_HANDLERS,
901        |bus, h| bus.router_account_state.fill_matching_handlers(topic, h),
902        state,
903    );
904}
905
906/// Publishes an order event to subscribers on a topic.
907pub fn publish_order_event(topic: MStr<Topic>, event: &OrderEventAny) {
908    publish_typed(
909        &ORDER_EVENT_HANDLERS,
910        |bus, h| bus.router_order_events.fill_matching_handlers(topic, h),
911        event,
912    );
913}
914
915/// Publishes a position event to subscribers on a topic.
916pub fn publish_position_event(topic: MStr<Topic>, event: &PositionEvent) {
917    publish_typed(
918        &POSITION_EVENT_HANDLERS,
919        |bus, h| bus.router_position_events.fill_matching_handlers(topic, h),
920        event,
921    );
922}
923
924/// Publishes a DeFi block to subscribers on a topic.
925#[cfg(feature = "defi")]
926pub fn publish_defi_block(topic: MStr<Topic>, block: &Block) {
927    publish_typed(
928        &DEFI_BLOCK_HANDLERS,
929        |bus, h| bus.router_defi_blocks.fill_matching_handlers(topic, h),
930        block,
931    );
932}
933
934/// Publishes a DeFi pool to subscribers on a topic.
935#[cfg(feature = "defi")]
936pub fn publish_defi_pool(topic: MStr<Topic>, pool: &Pool) {
937    publish_typed(
938        &DEFI_POOL_HANDLERS,
939        |bus, h| bus.router_defi_pools.fill_matching_handlers(topic, h),
940        pool,
941    );
942}
943
944/// Publishes a DeFi pool swap to subscribers on a topic.
945#[cfg(feature = "defi")]
946pub fn publish_defi_swap(topic: MStr<Topic>, swap: &PoolSwap) {
947    publish_typed(
948        &DEFI_SWAP_HANDLERS,
949        |bus, h| bus.router_defi_swaps.fill_matching_handlers(topic, h),
950        swap,
951    );
952}
953
954/// Publishes a DeFi liquidity update to subscribers on a topic.
955#[cfg(feature = "defi")]
956pub fn publish_defi_liquidity(topic: MStr<Topic>, update: &PoolLiquidityUpdate) {
957    publish_typed(
958        &DEFI_LIQUIDITY_HANDLERS,
959        |bus, h| bus.router_defi_liquidity.fill_matching_handlers(topic, h),
960        update,
961    );
962}
963
964/// Publishes a DeFi fee collect to subscribers on a topic.
965#[cfg(feature = "defi")]
966pub fn publish_defi_collect(topic: MStr<Topic>, collect: &PoolFeeCollect) {
967    publish_typed(
968        &DEFI_COLLECT_HANDLERS,
969        |bus, h| bus.router_defi_collects.fill_matching_handlers(topic, h),
970        collect,
971    );
972}
973
974/// Publishes a DeFi flash loan to subscribers on a topic.
975#[cfg(feature = "defi")]
976pub fn publish_defi_flash(topic: MStr<Topic>, flash: &PoolFlash) {
977    publish_typed(
978        &DEFI_FLASH_HANDLERS,
979        |bus, h| bus.router_defi_flash.fill_matching_handlers(topic, h),
980        flash,
981    );
982}
983
984/// Publishes a message to typed handlers using thread-local buffer reuse.
985///
986/// The `fill_fn` receives a mutable reference to the MessageBus, avoiding
987/// redundant TLS access and Rc clone/drop overhead per publish.
988///
989/// # Invariants
990///
991/// - `fill_fn` must not call any publish path (would panic from RefCell double-borrow).
992/// - Handler panics drop the buffer, losing reuse optimization (acceptable as panics are fatal).
993#[inline]
994fn publish_typed<T: 'static>(
995    tls: &'static LocalKey<RefCell<SmallVec<[TypedHandler<T>; HANDLER_BUFFER_CAP]>>>,
996    fill_fn: impl FnOnce(&mut MessageBus, &mut SmallVec<[TypedHandler<T>; HANDLER_BUFFER_CAP]>),
997    message: &T,
998) {
999    // SAFETY: Take buffer (re-entrancy safe)
1000    let mut handlers = tls.with_borrow_mut(std::mem::take);
1001
1002    // Borrow scope ends before dispatch to support re-entrant publishes
1003    MESSAGE_BUS.with(|cell| {
1004        let rc = cell.get_or_init(|| Rc::new(RefCell::new(MessageBus::default())));
1005        fill_fn(&mut rc.borrow_mut(), &mut handlers);
1006    });
1007
1008    for handler in &handlers {
1009        handler.handle(message);
1010    }
1011
1012    handlers.clear(); // Release refs before restore
1013    tls.with_borrow_mut(|buf| *buf = handlers);
1014}
1015
1016/// Sends a message to an endpoint handler using runtime type dispatch (Any).
1017pub fn send_any(endpoint: MStr<Endpoint>, message: &dyn Any) {
1018    let handler = get_message_bus().borrow().get_endpoint(endpoint).cloned();
1019
1020    if let Some(handler) = handler {
1021        handler.0.handle(message);
1022    } else {
1023        log::error!("send_any: no registered endpoint '{endpoint}'");
1024    }
1025}
1026
1027/// Sends a message to an endpoint, converting to Any (convenience wrapper).
1028pub fn send_any_value<T: 'static>(endpoint: MStr<Endpoint>, message: T) {
1029    let handler = get_message_bus().borrow().get_endpoint(endpoint).cloned();
1030
1031    if let Some(handler) = handler {
1032        handler.0.handle(&message);
1033    } else {
1034        log::error!("send_any_value: no registered endpoint '{endpoint}'");
1035    }
1036}
1037
1038/// Sends the [`DataResponse`] to the registered correlation ID handler.
1039pub fn send_response(correlation_id: &UUID4, message: DataResponse) {
1040    let handler = get_message_bus()
1041        .borrow()
1042        .get_response_handler(correlation_id)
1043        .cloned();
1044
1045    if let Some(handler) = handler {
1046        match &message {
1047            DataResponse::Data(resp) => handler.0.handle(resp),
1048            DataResponse::Instrument(resp) => handler.0.handle(resp.as_ref()),
1049            DataResponse::Instruments(resp) => handler.0.handle(resp),
1050            DataResponse::Book(resp) => handler.0.handle(resp),
1051            DataResponse::Quotes(resp) => handler.0.handle(resp),
1052            DataResponse::Trades(resp) => handler.0.handle(resp),
1053            DataResponse::FundingRates(resp) => handler.0.handle(resp),
1054            DataResponse::Bars(resp) => handler.0.handle(resp),
1055        }
1056    } else {
1057        log::error!("send_response: handler not found for correlation_id '{correlation_id}'");
1058    }
1059}
1060
1061/// Sends a quote tick to an endpoint handler.
1062pub fn send_quote(endpoint: MStr<Endpoint>, quote: &QuoteTick) {
1063    send_endpoint_ref(
1064        endpoint,
1065        quote,
1066        |bus| bus.endpoints_quotes.get(endpoint),
1067        "send_quote",
1068    );
1069}
1070
1071/// Sends a trade tick to an endpoint handler.
1072pub fn send_trade(endpoint: MStr<Endpoint>, trade: &TradeTick) {
1073    send_endpoint_ref(
1074        endpoint,
1075        trade,
1076        |bus| bus.endpoints_trades.get(endpoint),
1077        "send_trade",
1078    );
1079}
1080
1081/// Sends a bar to an endpoint handler.
1082pub fn send_bar(endpoint: MStr<Endpoint>, bar: &Bar) {
1083    send_endpoint_ref(
1084        endpoint,
1085        bar,
1086        |bus| bus.endpoints_bars.get(endpoint),
1087        "send_bar",
1088    );
1089}
1090
1091/// Sends an order event to an endpoint handler, transferring ownership.
1092pub fn send_order_event(endpoint: MStr<Endpoint>, event: OrderEventAny) {
1093    send_endpoint_owned(
1094        endpoint,
1095        event,
1096        |bus| bus.endpoints_order_events.get(endpoint),
1097        "send_order_event",
1098    );
1099}
1100
1101/// Sends an account state to an endpoint handler.
1102pub fn send_account_state(endpoint: MStr<Endpoint>, state: &AccountState) {
1103    send_endpoint_ref(
1104        endpoint,
1105        state,
1106        |bus| bus.endpoints_account_state.get(endpoint),
1107        "send_account_state",
1108    );
1109}
1110
1111/// Sends a trading command to an endpoint handler, transferring ownership.
1112pub fn send_trading_command(endpoint: MStr<Endpoint>, command: TradingCommand) {
1113    send_endpoint_owned(
1114        endpoint,
1115        command,
1116        |bus| bus.endpoints_trading_commands.get(endpoint),
1117        "send_trading_command",
1118    );
1119}
1120
1121/// Sends a data command to an endpoint handler, transferring ownership.
1122pub fn send_data_command(endpoint: MStr<Endpoint>, command: DataCommand) {
1123    send_endpoint_owned(
1124        endpoint,
1125        command,
1126        |bus| bus.endpoints_data_commands.get(endpoint),
1127        "send_data_command",
1128    );
1129}
1130
1131/// Sends a data response to an endpoint handler, transferring ownership.
1132pub fn send_data_response(endpoint: MStr<Endpoint>, response: DataResponse) {
1133    send_endpoint_owned(
1134        endpoint,
1135        response,
1136        |bus| bus.endpoints_data_responses.get(endpoint),
1137        "send_data_response",
1138    );
1139}
1140
1141/// Sends an execution report to an endpoint handler, transferring ownership.
1142pub fn send_execution_report(endpoint: MStr<Endpoint>, report: ExecutionReport) {
1143    send_endpoint_owned(
1144        endpoint,
1145        report,
1146        |bus| bus.endpoints_exec_reports.get(endpoint),
1147        "send_execution_report",
1148    );
1149}
1150
1151/// Sends data to an endpoint handler, transferring ownership.
1152pub fn send_data(endpoint: MStr<Endpoint>, data: Data) {
1153    send_endpoint_owned(
1154        endpoint,
1155        data,
1156        |bus| bus.endpoints_data.get(endpoint),
1157        "send_data",
1158    );
1159}
1160
1161/// Sends DeFi data to an endpoint handler, transferring ownership.
1162#[cfg(feature = "defi")]
1163pub fn send_defi_data(endpoint: MStr<Endpoint>, data: DefiData) {
1164    send_endpoint_owned(
1165        endpoint,
1166        data,
1167        |bus| bus.endpoints_defi_data.get(endpoint),
1168        "send_defi_data",
1169    );
1170}
1171
1172#[inline]
1173fn send_endpoint_ref<T: 'static, F>(
1174    endpoint: MStr<Endpoint>,
1175    message: &T,
1176    get_handler: F,
1177    fn_name: &str,
1178) where
1179    F: FnOnce(&MessageBus) -> Option<&TypedHandler<T>>,
1180{
1181    let handler = {
1182        let bus = get_message_bus();
1183        get_handler(&bus.borrow()).cloned()
1184    };
1185
1186    if let Some(handler) = handler {
1187        handler.handle(message);
1188    } else {
1189        log::error!("{fn_name}: no registered endpoint '{endpoint}'");
1190    }
1191}
1192
1193#[inline]
1194fn send_endpoint_owned<T: 'static, F>(
1195    endpoint: MStr<Endpoint>,
1196    message: T,
1197    get_handler: F,
1198    fn_name: &str,
1199) where
1200    F: FnOnce(&MessageBus) -> Option<&TypedIntoHandler<T>>,
1201{
1202    let handler = {
1203        let bus = get_message_bus();
1204        get_handler(&bus.borrow()).cloned()
1205    };
1206
1207    if let Some(handler) = handler {
1208        handler.handle(message);
1209    } else {
1210        log::error!("{fn_name}: no registered endpoint '{endpoint}'");
1211    }
1212}
1213
1214#[cfg(test)]
1215mod tests {
1216    //! Tests for the message bus API functions.
1217    //!
1218    //! Includes re-entrancy tests that verify handlers can call back into the
1219    //! message bus without causing RefCell borrow conflicts. This is the scenario
1220    //! where `send_*` holds a borrow, calls the handler, and the handler needs to
1221    //! call `borrow_mut()` for topic getters or other operations.
1222
1223    use std::{cell::RefCell, rc::Rc};
1224
1225    use nautilus_core::UUID4;
1226    use nautilus_model::{
1227        data::{Bar, OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick},
1228        enums::OrderSide,
1229        events::OrderDenied,
1230        identifiers::{ClientId, ClientOrderId, InstrumentId, StrategyId, TraderId},
1231    };
1232    use rstest::rstest;
1233
1234    use super::*;
1235    use crate::messages::{
1236        data::{DataCommand, SubscribeCommand, SubscribeQuotes},
1237        execution::{CancelAllOrders, TradingCommand},
1238    };
1239
1240    #[rstest]
1241    fn test_typed_quote_publish_subscribe_integration() {
1242        let _msgbus = get_message_bus();
1243        let received = Rc::new(RefCell::new(Vec::new()));
1244        let received_clone = received.clone();
1245
1246        let handler = TypedHandler::from(move |quote: &QuoteTick| {
1247            received_clone.borrow_mut().push(*quote);
1248        });
1249
1250        subscribe_quotes("data.quotes.*".into(), handler, None);
1251
1252        let quote = QuoteTick::default();
1253        publish_quote("data.quotes.TEST".into(), &quote);
1254        publish_quote("data.quotes.TEST".into(), &quote);
1255
1256        assert_eq!(received.borrow().len(), 2);
1257    }
1258
1259    #[rstest]
1260    fn test_typed_trade_publish_subscribe_integration() {
1261        let _msgbus = get_message_bus();
1262        let received = Rc::new(RefCell::new(Vec::new()));
1263        let received_clone = received.clone();
1264
1265        let handler = TypedHandler::from(move |trade: &TradeTick| {
1266            received_clone.borrow_mut().push(*trade);
1267        });
1268
1269        subscribe_trades("data.trades.*".into(), handler, None);
1270
1271        let trade = TradeTick::default();
1272        publish_trade("data.trades.TEST".into(), &trade);
1273
1274        assert_eq!(received.borrow().len(), 1);
1275    }
1276
1277    #[rstest]
1278    fn test_typed_bar_publish_subscribe_integration() {
1279        let _msgbus = get_message_bus();
1280        let received = Rc::new(RefCell::new(Vec::new()));
1281        let received_clone = received.clone();
1282
1283        let handler = TypedHandler::from(move |bar: &Bar| {
1284            received_clone.borrow_mut().push(*bar);
1285        });
1286
1287        subscribe_bars("data.bars.*".into(), handler, None);
1288
1289        let bar = Bar::default();
1290        publish_bar("data.bars.TEST".into(), &bar);
1291
1292        assert_eq!(received.borrow().len(), 1);
1293    }
1294
1295    #[rstest]
1296    fn test_typed_deltas_publish_subscribe_integration() {
1297        let _msgbus = get_message_bus();
1298        let received = Rc::new(RefCell::new(Vec::new()));
1299        let received_clone = received.clone();
1300
1301        let handler = TypedHandler::from(move |deltas: &OrderBookDeltas| {
1302            received_clone.borrow_mut().push(deltas.clone());
1303        });
1304
1305        subscribe_book_deltas("data.book.deltas.*".into(), handler, None);
1306
1307        let instrument_id = InstrumentId::from("TEST.VENUE");
1308        let delta = OrderBookDelta::clear(instrument_id, 0, 1.into(), 2.into());
1309        let deltas = OrderBookDeltas::new(instrument_id, vec![delta]);
1310        publish_deltas("data.book.deltas.TEST".into(), &deltas);
1311
1312        assert_eq!(received.borrow().len(), 1);
1313    }
1314
1315    #[rstest]
1316    fn test_typed_unsubscribe_stops_delivery() {
1317        let _msgbus = get_message_bus();
1318        let received = Rc::new(RefCell::new(Vec::new()));
1319        let received_clone = received.clone();
1320
1321        let handler = TypedHandler::from_with_id("unsub-test", move |quote: &QuoteTick| {
1322            received_clone.borrow_mut().push(*quote);
1323        });
1324
1325        subscribe_quotes("data.quotes.UNSUB".into(), handler.clone(), None);
1326
1327        let quote = QuoteTick::default();
1328        publish_quote("data.quotes.UNSUB".into(), &quote);
1329        assert_eq!(received.borrow().len(), 1);
1330
1331        unsubscribe_quotes("data.quotes.UNSUB".into(), &handler);
1332
1333        publish_quote("data.quotes.UNSUB".into(), &quote);
1334        assert_eq!(received.borrow().len(), 1);
1335    }
1336
1337    #[rstest]
1338    fn test_typed_wildcard_pattern_matching() {
1339        let _msgbus = get_message_bus();
1340        let received = Rc::new(RefCell::new(Vec::new()));
1341        let received_clone = received.clone();
1342
1343        let handler = TypedHandler::from(move |quote: &QuoteTick| {
1344            received_clone.borrow_mut().push(*quote);
1345        });
1346
1347        subscribe_quotes("data.quotes.WILD.*".into(), handler, None);
1348
1349        let quote = QuoteTick::default();
1350        publish_quote("data.quotes.WILD.AAPL".into(), &quote);
1351        publish_quote("data.quotes.WILD.MSFT".into(), &quote);
1352        publish_quote("data.quotes.OTHER.AAPL".into(), &quote);
1353
1354        assert_eq!(received.borrow().len(), 2);
1355    }
1356
1357    #[rstest]
1358    fn test_typed_priority_ordering() {
1359        let _msgbus = get_message_bus();
1360        let order = Rc::new(RefCell::new(Vec::new()));
1361
1362        let order1 = order.clone();
1363        let handler_low = TypedHandler::from_with_id("low-priority", move |_: &QuoteTick| {
1364            order1.borrow_mut().push("low");
1365        });
1366
1367        let order2 = order.clone();
1368        let handler_high = TypedHandler::from_with_id("high-priority", move |_: &QuoteTick| {
1369            order2.borrow_mut().push("high");
1370        });
1371
1372        subscribe_quotes("data.quotes.PRIO.*".into(), handler_low, Some(1));
1373        subscribe_quotes("data.quotes.PRIO.*".into(), handler_high, Some(10));
1374
1375        let quote = QuoteTick::default();
1376        publish_quote("data.quotes.PRIO.TEST".into(), &quote);
1377
1378        assert_eq!(*order.borrow(), vec!["high", "low"]);
1379    }
1380
1381    #[rstest]
1382    fn test_typed_routing_isolation() {
1383        let _msgbus = get_message_bus();
1384        let quote_received = Rc::new(RefCell::new(false));
1385        let trade_received = Rc::new(RefCell::new(false));
1386
1387        let qr = quote_received.clone();
1388        let quote_handler = TypedHandler::from(move |_: &QuoteTick| {
1389            *qr.borrow_mut() = true;
1390        });
1391
1392        let tr = trade_received.clone();
1393        let trade_handler = TypedHandler::from(move |_: &TradeTick| {
1394            *tr.borrow_mut() = true;
1395        });
1396
1397        subscribe_quotes("data.iso.*".into(), quote_handler, None);
1398        subscribe_trades("data.iso.*".into(), trade_handler, None);
1399
1400        let quote = QuoteTick::default();
1401        publish_quote("data.iso.TEST".into(), &quote);
1402
1403        assert!(*quote_received.borrow());
1404        assert!(!*trade_received.borrow());
1405    }
1406
1407    #[rstest]
1408    fn test_send_data_allows_reentrant_topic_access() {
1409        use crate::msgbus::switchboard::get_quotes_topic;
1410
1411        let _msgbus = get_message_bus();
1412        let topic_retrieved = Rc::new(RefCell::new(false));
1413        let topic_clone = topic_retrieved.clone();
1414
1415        let handler = TypedIntoHandler::from(move |data: Data| {
1416            let instrument_id = data.instrument_id();
1417            let _topic = get_quotes_topic(instrument_id);
1418            *topic_clone.borrow_mut() = true;
1419        });
1420
1421        let endpoint: MStr<Endpoint> = "ReentrantTest.data".into();
1422        register_data_endpoint(endpoint, handler);
1423
1424        let quote = QuoteTick::default();
1425        send_data(endpoint, Data::Quote(quote));
1426
1427        assert!(*topic_retrieved.borrow());
1428    }
1429
1430    #[rstest]
1431    fn test_send_trading_command_allows_reentrant_topic_access() {
1432        use nautilus_model::{
1433            enums::OrderSide,
1434            identifiers::{StrategyId, TraderId},
1435        };
1436
1437        use crate::{
1438            messages::execution::{TradingCommand, cancel::CancelAllOrders},
1439            msgbus::switchboard::get_trades_topic,
1440        };
1441
1442        let _msgbus = get_message_bus();
1443        let topic_retrieved = Rc::new(RefCell::new(false));
1444        let topic_clone = topic_retrieved.clone();
1445
1446        let handler = TypedIntoHandler::from(move |cmd: TradingCommand| {
1447            let instrument_id = cmd.instrument_id();
1448            let _topic = get_trades_topic(instrument_id);
1449            *topic_clone.borrow_mut() = true;
1450        });
1451
1452        let endpoint: MStr<Endpoint> = "ReentrantTest.tradingCmd".into();
1453        register_trading_command_endpoint(endpoint, handler);
1454
1455        let cmd = TradingCommand::CancelAllOrders(CancelAllOrders::new(
1456            TraderId::new("TESTER-001"),
1457            None,
1458            StrategyId::new("S-001"),
1459            InstrumentId::from("TEST.VENUE"),
1460            OrderSide::NoOrderSide,
1461            UUID4::new(),
1462            0.into(),
1463            None,
1464        ));
1465        send_trading_command(endpoint, cmd);
1466
1467        assert!(*topic_retrieved.borrow());
1468    }
1469
1470    #[rstest]
1471    fn test_send_account_state_allows_reentrant_topic_access() {
1472        use nautilus_model::{enums::AccountType, identifiers::AccountId, types::Currency};
1473
1474        use crate::msgbus::switchboard::get_quotes_topic;
1475
1476        let _msgbus = get_message_bus();
1477        let topic_retrieved = Rc::new(RefCell::new(false));
1478        let topic_clone = topic_retrieved.clone();
1479
1480        let handler = TypedHandler::from(move |_state: &AccountState| {
1481            let instrument_id = InstrumentId::from("TEST.VENUE");
1482            let _topic = get_quotes_topic(instrument_id);
1483            *topic_clone.borrow_mut() = true;
1484        });
1485
1486        let endpoint: MStr<Endpoint> = "ReentrantTest.accountState".into();
1487        register_account_state_endpoint(endpoint, handler);
1488
1489        let state = AccountState::new(
1490            AccountId::new("SIM-001"),
1491            AccountType::Cash,
1492            vec![],
1493            vec![],
1494            true,
1495            UUID4::new(),
1496            0.into(),
1497            0.into(),
1498            Some(Currency::USD()),
1499        );
1500        send_account_state(endpoint, &state);
1501
1502        assert!(*topic_retrieved.borrow());
1503    }
1504
1505    #[rstest]
1506    fn test_send_order_event_allows_reentrant_topic_access() {
1507        use nautilus_model::{
1508            events::OrderDenied,
1509            identifiers::{ClientOrderId, StrategyId, TraderId},
1510        };
1511
1512        use crate::msgbus::switchboard::get_quotes_topic;
1513
1514        let _msgbus = get_message_bus();
1515        let topic_retrieved = Rc::new(RefCell::new(false));
1516        let topic_clone = topic_retrieved.clone();
1517
1518        let handler = TypedIntoHandler::from(move |_event: OrderEventAny| {
1519            let instrument_id = InstrumentId::from("TEST.VENUE");
1520            let _topic = get_quotes_topic(instrument_id);
1521            *topic_clone.borrow_mut() = true;
1522        });
1523
1524        let endpoint: MStr<Endpoint> = "ReentrantTest.orderEvent".into();
1525        register_order_event_endpoint(endpoint, handler);
1526
1527        let event = OrderEventAny::Denied(OrderDenied::new(
1528            TraderId::new("TESTER-001"),
1529            StrategyId::new("S-001"),
1530            InstrumentId::from("TEST.VENUE"),
1531            ClientOrderId::new("O-001"),
1532            "test denied".into(),
1533            UUID4::new(),
1534            0.into(),
1535            0.into(),
1536        ));
1537        send_order_event(endpoint, event);
1538
1539        assert!(*topic_retrieved.borrow());
1540    }
1541
1542    #[rstest]
1543    fn test_send_data_command_allows_reentrant_topic_access() {
1544        use nautilus_model::identifiers::ClientId;
1545
1546        use crate::{
1547            messages::data::{DataCommand, SubscribeCommand, SubscribeQuotes},
1548            msgbus::switchboard::get_trades_topic,
1549        };
1550
1551        let _msgbus = get_message_bus();
1552        let topic_retrieved = Rc::new(RefCell::new(false));
1553        let topic_clone = topic_retrieved.clone();
1554
1555        let handler = TypedIntoHandler::from(move |_cmd: DataCommand| {
1556            let _topic = get_trades_topic(InstrumentId::from("TEST.VENUE"));
1557            *topic_clone.borrow_mut() = true;
1558        });
1559
1560        let endpoint: MStr<Endpoint> = "ReentrantTest.dataCmd".into();
1561        register_data_command_endpoint(endpoint, handler);
1562
1563        let cmd = DataCommand::Subscribe(SubscribeCommand::Quotes(SubscribeQuotes::new(
1564            InstrumentId::from("TEST.VENUE"),
1565            Some(ClientId::new("SIM")),
1566            None,
1567            UUID4::new(),
1568            0.into(),
1569            None,
1570            None,
1571        )));
1572        send_data_command(endpoint, cmd);
1573
1574        assert!(*topic_retrieved.borrow());
1575    }
1576
1577    #[rstest]
1578    fn test_send_data_response_allows_reentrant_topic_access() {
1579        use nautilus_model::identifiers::ClientId;
1580
1581        use crate::{
1582            messages::data::{DataResponse, QuotesResponse},
1583            msgbus::switchboard::get_quotes_topic,
1584        };
1585
1586        let _msgbus = get_message_bus();
1587        let topic_retrieved = Rc::new(RefCell::new(false));
1588        let topic_clone = topic_retrieved.clone();
1589
1590        let handler = TypedIntoHandler::from(move |_resp: DataResponse| {
1591            let _topic = get_quotes_topic(InstrumentId::from("TEST.VENUE"));
1592            *topic_clone.borrow_mut() = true;
1593        });
1594
1595        let endpoint: MStr<Endpoint> = "ReentrantTest.dataResp".into();
1596        register_data_response_endpoint(endpoint, handler);
1597
1598        let resp = DataResponse::Quotes(QuotesResponse {
1599            correlation_id: UUID4::new(),
1600            client_id: ClientId::new("SIM"),
1601            instrument_id: InstrumentId::from("TEST.VENUE"),
1602            data: vec![],
1603            start: None,
1604            end: None,
1605            ts_init: 0.into(),
1606            params: None,
1607        });
1608        send_data_response(endpoint, resp);
1609
1610        assert!(*topic_retrieved.borrow());
1611    }
1612
1613    #[rstest]
1614    fn test_send_execution_report_allows_reentrant_topic_access() {
1615        use nautilus_model::{
1616            identifiers::{AccountId, ClientId, Venue},
1617            reports::ExecutionMassStatus,
1618        };
1619
1620        use crate::{messages::execution::ExecutionReport, msgbus::switchboard::get_trades_topic};
1621
1622        let _msgbus = get_message_bus();
1623        let topic_retrieved = Rc::new(RefCell::new(false));
1624        let topic_clone = topic_retrieved.clone();
1625
1626        let handler = TypedIntoHandler::from(move |_report: ExecutionReport| {
1627            let _topic = get_trades_topic(InstrumentId::from("TEST.VENUE"));
1628            *topic_clone.borrow_mut() = true;
1629        });
1630
1631        let endpoint: MStr<Endpoint> = "ReentrantTest.execReport".into();
1632        register_execution_report_endpoint(endpoint, handler);
1633
1634        let report = ExecutionReport::MassStatus(Box::new(ExecutionMassStatus::new(
1635            ClientId::new("SIM"),
1636            AccountId::new("SIM-001"),
1637            Venue::new("TEST"),
1638            0.into(),
1639            None,
1640        )));
1641        send_execution_report(endpoint, report);
1642
1643        assert!(*topic_retrieved.borrow());
1644    }
1645
1646    #[rstest]
1647    fn test_order_event_handler_can_send_trading_command() {
1648        // Tests that a handler processing an order event can send a trading command
1649        // without causing a borrow conflict. This simulates the scenario where a
1650        // strategy's on_order_accepted() handler calls cancel_order().
1651        let _msgbus = get_message_bus();
1652        let command_sent = Rc::new(RefCell::new(false));
1653        let command_sent_clone = command_sent.clone();
1654
1655        let cmd_received = Rc::new(RefCell::new(false));
1656        let cmd_received_clone = cmd_received.clone();
1657        let cmd_handler = TypedIntoHandler::from(move |_cmd: TradingCommand| {
1658            *cmd_received_clone.borrow_mut() = true;
1659        });
1660        let cmd_endpoint: MStr<Endpoint> = "ReentrantTest.execCmd".into();
1661        register_trading_command_endpoint(cmd_endpoint, cmd_handler);
1662
1663        let event_handler = TypedIntoHandler::from(move |_event: OrderEventAny| {
1664            // Simulate strategy calling cancel_order from on_order_accepted
1665            let command = TradingCommand::CancelAllOrders(CancelAllOrders::new(
1666                TraderId::new("TESTER-001"),
1667                None,
1668                StrategyId::new("S-001"),
1669                InstrumentId::from("TEST.VENUE"),
1670                OrderSide::Buy,
1671                UUID4::new(),
1672                0.into(),
1673                None,
1674            ));
1675            send_trading_command(cmd_endpoint, command);
1676            *command_sent_clone.borrow_mut() = true;
1677        });
1678
1679        let event_endpoint: MStr<Endpoint> = "ReentrantTest.orderEvt".into();
1680        register_order_event_endpoint(event_endpoint, event_handler);
1681
1682        let event = OrderEventAny::Denied(OrderDenied::new(
1683            TraderId::new("TESTER-001"),
1684            StrategyId::new("S-001"),
1685            InstrumentId::from("TEST.VENUE"),
1686            ClientOrderId::new("O-001"),
1687            "Test denial".into(),
1688            UUID4::new(),
1689            0.into(),
1690            0.into(),
1691        ));
1692        send_order_event(event_endpoint, event);
1693
1694        assert!(
1695            *command_sent.borrow(),
1696            "Order event handler should have run"
1697        );
1698        assert!(
1699            *cmd_received.borrow(),
1700            "Trading command should have been received"
1701        );
1702    }
1703
1704    #[rstest]
1705    fn test_data_handler_can_send_data_command() {
1706        // Tests that a handler processing data can send a data command
1707        // without causing a borrow conflict.
1708        let _msgbus = get_message_bus();
1709        let command_sent = Rc::new(RefCell::new(false));
1710        let command_sent_clone = command_sent.clone();
1711
1712        let cmd_received = Rc::new(RefCell::new(false));
1713        let cmd_received_clone = cmd_received.clone();
1714        let cmd_handler = TypedIntoHandler::from(move |_cmd: DataCommand| {
1715            *cmd_received_clone.borrow_mut() = true;
1716        });
1717        let cmd_endpoint: MStr<Endpoint> = "ReentrantTest.dataCmd2".into();
1718        register_data_command_endpoint(cmd_endpoint, cmd_handler);
1719
1720        let data_handler = TypedIntoHandler::from(move |_data: Data| {
1721            let command = DataCommand::Subscribe(SubscribeCommand::Quotes(SubscribeQuotes::new(
1722                InstrumentId::from("TEST.VENUE"),
1723                Some(ClientId::new("SIM")),
1724                None,
1725                UUID4::new(),
1726                0.into(),
1727                None,
1728                None,
1729            )));
1730            send_data_command(cmd_endpoint, command);
1731            *command_sent_clone.borrow_mut() = true;
1732        });
1733
1734        let data_endpoint: MStr<Endpoint> = "ReentrantTest.data2".into();
1735        register_data_endpoint(data_endpoint, data_handler);
1736
1737        let quote = QuoteTick::default();
1738        send_data(data_endpoint, Data::Quote(quote));
1739
1740        assert!(*command_sent.borrow(), "Data handler should have run");
1741        assert!(
1742            *cmd_received.borrow(),
1743            "Data command should have been received"
1744        );
1745    }
1746
1747    #[rstest]
1748    fn test_trading_command_handler_can_send_order_event() {
1749        // Tests that a handler processing a trading command can send an order event
1750        // without causing a borrow conflict. This is the reverse direction of the
1751        // common re-entrancy scenario.
1752        let _msgbus = get_message_bus();
1753        let event_sent = Rc::new(RefCell::new(false));
1754        let event_sent_clone = event_sent.clone();
1755
1756        let evt_received = Rc::new(RefCell::new(false));
1757        let evt_received_clone = evt_received.clone();
1758        let evt_handler = TypedIntoHandler::from(move |_event: OrderEventAny| {
1759            *evt_received_clone.borrow_mut() = true;
1760        });
1761        let evt_endpoint: MStr<Endpoint> = "ReentrantTest.orderEvt2".into();
1762        register_order_event_endpoint(evt_endpoint, evt_handler);
1763
1764        let cmd_handler = TypedIntoHandler::from(move |_cmd: TradingCommand| {
1765            let event = OrderEventAny::Denied(OrderDenied::new(
1766                TraderId::new("TESTER-001"),
1767                StrategyId::new("S-001"),
1768                InstrumentId::from("TEST.VENUE"),
1769                ClientOrderId::new("O-001"),
1770                "Test denial".into(),
1771                UUID4::new(),
1772                0.into(),
1773                0.into(),
1774            ));
1775            send_order_event(evt_endpoint, event);
1776            *event_sent_clone.borrow_mut() = true;
1777        });
1778
1779        let cmd_endpoint: MStr<Endpoint> = "ReentrantTest.execCmd2".into();
1780        register_trading_command_endpoint(cmd_endpoint, cmd_handler);
1781
1782        let command = TradingCommand::CancelAllOrders(CancelAllOrders::new(
1783            TraderId::new("TESTER-001"),
1784            None,
1785            StrategyId::new("S-001"),
1786            InstrumentId::from("TEST.VENUE"),
1787            OrderSide::Buy,
1788            UUID4::new(),
1789            0.into(),
1790            None,
1791        ));
1792        send_trading_command(cmd_endpoint, command);
1793
1794        assert!(
1795            *event_sent.borrow(),
1796            "Trading command handler should have run"
1797        );
1798        assert!(
1799            *evt_received.borrow(),
1800            "Order event should have been received"
1801        );
1802    }
1803
1804    #[rstest]
1805    fn test_nested_reentrant_calls() {
1806        // Tests deeply nested re-entrant calls: order event -> trading command -> order event.
1807        // This simulates a complex scenario where handlers chain multiple calls.
1808        let _msgbus = get_message_bus();
1809        let call_depth = Rc::new(RefCell::new(0u32));
1810
1811        let final_received = Rc::new(RefCell::new(false));
1812        let final_received_clone = final_received.clone();
1813        let final_evt_handler = TypedIntoHandler::from(move |_event: OrderEventAny| {
1814            *final_received_clone.borrow_mut() = true;
1815        });
1816        let final_evt_endpoint: MStr<Endpoint> = "ReentrantTest.finalEvt".into();
1817        register_order_event_endpoint(final_evt_endpoint, final_evt_handler);
1818
1819        let call_depth_clone2 = call_depth.clone();
1820        let mid_cmd_handler = TypedIntoHandler::from(move |_cmd: TradingCommand| {
1821            *call_depth_clone2.borrow_mut() += 1;
1822            let event = OrderEventAny::Denied(OrderDenied::new(
1823                TraderId::new("TESTER-001"),
1824                StrategyId::new("S-001"),
1825                InstrumentId::from("TEST.VENUE"),
1826                ClientOrderId::new("O-002"),
1827                "Nested denial".into(),
1828                UUID4::new(),
1829                0.into(),
1830                0.into(),
1831            ));
1832            send_order_event(final_evt_endpoint, event);
1833        });
1834        let mid_cmd_endpoint: MStr<Endpoint> = "ReentrantTest.midCmd".into();
1835        register_trading_command_endpoint(mid_cmd_endpoint, mid_cmd_handler);
1836
1837        let call_depth_clone1 = call_depth.clone();
1838        let init_evt_handler = TypedIntoHandler::from(move |_event: OrderEventAny| {
1839            *call_depth_clone1.borrow_mut() += 1;
1840            let command = TradingCommand::CancelAllOrders(CancelAllOrders::new(
1841                TraderId::new("TESTER-001"),
1842                None,
1843                StrategyId::new("S-001"),
1844                InstrumentId::from("TEST.VENUE"),
1845                OrderSide::Buy,
1846                UUID4::new(),
1847                0.into(),
1848                None,
1849            ));
1850            send_trading_command(mid_cmd_endpoint, command);
1851        });
1852        let init_evt_endpoint: MStr<Endpoint> = "ReentrantTest.initEvt".into();
1853        register_order_event_endpoint(init_evt_endpoint, init_evt_handler);
1854
1855        let event = OrderEventAny::Denied(OrderDenied::new(
1856            TraderId::new("TESTER-001"),
1857            StrategyId::new("S-001"),
1858            InstrumentId::from("TEST.VENUE"),
1859            ClientOrderId::new("O-001"),
1860            "Initial denial".into(),
1861            UUID4::new(),
1862            0.into(),
1863            0.into(),
1864        ));
1865        send_order_event(init_evt_endpoint, event);
1866
1867        assert_eq!(
1868            *call_depth.borrow(),
1869            2,
1870            "Both intermediate handlers should have run"
1871        );
1872        assert!(
1873            *final_received.borrow(),
1874            "Final event handler should have received the event"
1875        );
1876    }
1877}