1use std::{any::Any, cell::RefCell, rc::Rc, thread::LocalKey};
27
28use nautilus_core::UUID4;
29#[cfg(feature = "defi")]
30use nautilus_model::defi::{
31 Block, DefiData, Pool, PoolFeeCollect, PoolFlash, PoolLiquidityUpdate, PoolSwap,
32};
33use nautilus_model::{
34 data::{
35 Bar, Data, FundingRateUpdate, GreeksData, IndexPriceUpdate, MarkPriceUpdate,
36 OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick,
37 },
38 events::{AccountState, OrderEventAny, PositionEvent},
39 orderbook::OrderBook,
40 orders::OrderAny,
41 position::Position,
42};
43use smallvec::SmallVec;
44use ustr::Ustr;
45
46use super::{
47 ACCOUNT_STATE_HANDLERS, ANY_HANDLERS, BAR_HANDLERS, BOOK_HANDLERS, DELTAS_HANDLERS,
48 DEPTH10_HANDLERS, FUNDING_RATE_HANDLERS, GREEKS_HANDLERS, HANDLER_BUFFER_CAP,
49 INDEX_PRICE_HANDLERS, MARK_PRICE_HANDLERS, MESSAGE_BUS, ORDER_EVENT_HANDLERS,
50 POSITION_EVENT_HANDLERS, QUOTE_HANDLERS, TRADE_HANDLERS,
51 core::{MessageBus, Subscription},
52 get_message_bus,
53 matching::is_matching_backtracking,
54 mstr::{Endpoint, MStr, Pattern, Topic},
55 typed_handler::{ShareableMessageHandler, TypedHandler, TypedIntoHandler},
56};
57#[cfg(feature = "defi")]
58use super::{
59 DEFI_BLOCK_HANDLERS, DEFI_COLLECT_HANDLERS, DEFI_FLASH_HANDLERS, DEFI_LIQUIDITY_HANDLERS,
60 DEFI_POOL_HANDLERS, DEFI_SWAP_HANDLERS,
61};
62use crate::messages::{
63 data::{DataCommand, DataResponse},
64 execution::{ExecutionReport, TradingCommand},
65};
66
67pub fn register_any(endpoint: MStr<Endpoint>, handler: ShareableMessageHandler) {
69 log::debug!(
70 "Registering endpoint '{endpoint}' with handler ID {}",
71 handler.0.id(),
72 );
73 get_message_bus()
74 .borrow_mut()
75 .endpoints
76 .insert(endpoint, handler);
77}
78
79pub fn register_response_handler(correlation_id: &UUID4, handler: ShareableMessageHandler) {
81 if let Err(e) = get_message_bus()
82 .borrow_mut()
83 .register_response_handler(correlation_id, handler)
84 {
85 log::error!("Failed to register request handler: {e}");
86 }
87}
88
89pub fn register_quote_endpoint(endpoint: MStr<Endpoint>, handler: TypedHandler<QuoteTick>) {
91 get_message_bus()
92 .borrow_mut()
93 .endpoints_quotes
94 .register(endpoint, handler);
95}
96
97pub fn register_trade_endpoint(endpoint: MStr<Endpoint>, handler: TypedHandler<TradeTick>) {
99 get_message_bus()
100 .borrow_mut()
101 .endpoints_trades
102 .register(endpoint, handler);
103}
104
105pub fn register_bar_endpoint(endpoint: MStr<Endpoint>, handler: TypedHandler<Bar>) {
107 get_message_bus()
108 .borrow_mut()
109 .endpoints_bars
110 .register(endpoint, handler);
111}
112
113pub fn register_order_event_endpoint(
115 endpoint: MStr<Endpoint>,
116 handler: TypedIntoHandler<OrderEventAny>,
117) {
118 get_message_bus()
119 .borrow_mut()
120 .endpoints_order_events
121 .register(endpoint, handler);
122}
123
124pub fn register_account_state_endpoint(
126 endpoint: MStr<Endpoint>,
127 handler: TypedHandler<AccountState>,
128) {
129 get_message_bus()
130 .borrow_mut()
131 .endpoints_account_state
132 .register(endpoint, handler);
133}
134
135pub fn register_trading_command_endpoint(
137 endpoint: MStr<Endpoint>,
138 handler: TypedIntoHandler<TradingCommand>,
139) {
140 get_message_bus()
141 .borrow_mut()
142 .endpoints_trading_commands
143 .register(endpoint, handler);
144}
145
146pub fn register_data_command_endpoint(
148 endpoint: MStr<Endpoint>,
149 handler: TypedIntoHandler<DataCommand>,
150) {
151 get_message_bus()
152 .borrow_mut()
153 .endpoints_data_commands
154 .register(endpoint, handler);
155}
156
157pub fn register_data_response_endpoint(
159 endpoint: MStr<Endpoint>,
160 handler: TypedIntoHandler<DataResponse>,
161) {
162 get_message_bus()
163 .borrow_mut()
164 .endpoints_data_responses
165 .register(endpoint, handler);
166}
167
168pub fn register_execution_report_endpoint(
170 endpoint: MStr<Endpoint>,
171 handler: TypedIntoHandler<ExecutionReport>,
172) {
173 get_message_bus()
174 .borrow_mut()
175 .endpoints_exec_reports
176 .register(endpoint, handler);
177}
178
179pub fn register_data_endpoint(endpoint: MStr<Endpoint>, handler: TypedIntoHandler<Data>) {
181 get_message_bus()
182 .borrow_mut()
183 .endpoints_data
184 .register(endpoint, handler);
185}
186
187#[cfg(feature = "defi")]
189pub fn register_defi_data_endpoint(endpoint: MStr<Endpoint>, handler: TypedIntoHandler<DefiData>) {
190 get_message_bus()
191 .borrow_mut()
192 .endpoints_defi_data
193 .register(endpoint, handler);
194}
195
196pub fn deregister_any(endpoint: MStr<Endpoint>) {
198 log::debug!("Deregistering endpoint '{endpoint}'");
199 get_message_bus()
200 .borrow_mut()
201 .endpoints
202 .shift_remove(&endpoint);
203}
204
205pub fn subscribe_any(
216 pattern: MStr<Pattern>,
217 handler: ShareableMessageHandler,
218 priority: Option<u8>,
219) {
220 let msgbus = get_message_bus();
221 let mut msgbus_ref_mut = msgbus.borrow_mut();
222 let sub = Subscription::new(pattern, handler, priority);
223
224 log::debug!(
225 "Subscribing {:?} for pattern '{}'",
226 sub.handler,
227 sub.pattern
228 );
229
230 if msgbus_ref_mut.subscriptions.contains(&sub) {
231 log::warn!("{sub:?} already exists");
232 return;
233 }
234
235 for (topic, subs) in &mut msgbus_ref_mut.topics {
236 if is_matching_backtracking(*topic, sub.pattern) {
237 subs.push(sub.clone());
238 subs.sort();
239 log::debug!("Added subscription for '{topic}'");
240 }
241 }
242
243 msgbus_ref_mut.subscriptions.insert(sub);
244}
245
246pub fn subscribe_instruments(
248 pattern: MStr<Pattern>,
249 handler: ShareableMessageHandler,
250 priority: Option<u8>,
251) {
252 subscribe_any(pattern, handler, priority);
253}
254
255pub fn subscribe_instrument_close(
257 pattern: MStr<Pattern>,
258 handler: ShareableMessageHandler,
259 priority: Option<u8>,
260) {
261 subscribe_any(pattern, handler, priority);
262}
263
264pub fn subscribe_book_deltas(
266 pattern: MStr<Pattern>,
267 handler: TypedHandler<OrderBookDeltas>,
268 priority: Option<u8>,
269) {
270 get_message_bus()
271 .borrow_mut()
272 .router_deltas
273 .subscribe(pattern, handler, priority.unwrap_or(0));
274}
275
276pub fn subscribe_book_depth10(
278 pattern: MStr<Pattern>,
279 handler: TypedHandler<OrderBookDepth10>,
280 priority: Option<u8>,
281) {
282 get_message_bus().borrow_mut().router_depth10.subscribe(
283 pattern,
284 handler,
285 priority.unwrap_or(0),
286 );
287}
288
289pub fn subscribe_book_snapshots(
291 pattern: MStr<Pattern>,
292 handler: TypedHandler<OrderBook>,
293 priority: Option<u8>,
294) {
295 get_message_bus()
296 .borrow_mut()
297 .router_book_snapshots
298 .subscribe(pattern, handler, priority.unwrap_or(0));
299}
300
301pub fn subscribe_quotes(
303 pattern: MStr<Pattern>,
304 handler: TypedHandler<QuoteTick>,
305 priority: Option<u8>,
306) {
307 get_message_bus()
308 .borrow_mut()
309 .router_quotes
310 .subscribe(pattern, handler, priority.unwrap_or(0));
311}
312
313pub fn subscribe_trades(
315 pattern: MStr<Pattern>,
316 handler: TypedHandler<TradeTick>,
317 priority: Option<u8>,
318) {
319 get_message_bus()
320 .borrow_mut()
321 .router_trades
322 .subscribe(pattern, handler, priority.unwrap_or(0));
323}
324
325pub fn subscribe_bars(pattern: MStr<Pattern>, handler: TypedHandler<Bar>, priority: Option<u8>) {
327 get_message_bus()
328 .borrow_mut()
329 .router_bars
330 .subscribe(pattern, handler, priority.unwrap_or(0));
331}
332
333pub fn subscribe_mark_prices(
335 pattern: MStr<Pattern>,
336 handler: TypedHandler<MarkPriceUpdate>,
337 priority: Option<u8>,
338) {
339 get_message_bus().borrow_mut().router_mark_prices.subscribe(
340 pattern,
341 handler,
342 priority.unwrap_or(0),
343 );
344}
345
346pub fn subscribe_index_prices(
348 pattern: MStr<Pattern>,
349 handler: TypedHandler<IndexPriceUpdate>,
350 priority: Option<u8>,
351) {
352 get_message_bus()
353 .borrow_mut()
354 .router_index_prices
355 .subscribe(pattern, handler, priority.unwrap_or(0));
356}
357
358pub fn subscribe_funding_rates(
360 pattern: MStr<Pattern>,
361 handler: TypedHandler<FundingRateUpdate>,
362 priority: Option<u8>,
363) {
364 get_message_bus()
365 .borrow_mut()
366 .router_funding_rates
367 .subscribe(pattern, handler, priority.unwrap_or(0));
368}
369
370pub fn subscribe_greeks(
372 pattern: MStr<Pattern>,
373 handler: TypedHandler<GreeksData>,
374 priority: Option<u8>,
375) {
376 get_message_bus()
377 .borrow_mut()
378 .router_greeks
379 .subscribe(pattern, handler, priority.unwrap_or(0));
380}
381
382pub fn subscribe_order_events(
384 pattern: MStr<Pattern>,
385 handler: TypedHandler<OrderEventAny>,
386 priority: Option<u8>,
387) {
388 get_message_bus()
389 .borrow_mut()
390 .router_order_events
391 .subscribe(pattern, handler, priority.unwrap_or(0));
392}
393
394pub fn subscribe_position_events(
396 pattern: MStr<Pattern>,
397 handler: TypedHandler<PositionEvent>,
398 priority: Option<u8>,
399) {
400 get_message_bus()
401 .borrow_mut()
402 .router_position_events
403 .subscribe(pattern, handler, priority.unwrap_or(0));
404}
405
406pub fn subscribe_account_state(
408 pattern: MStr<Pattern>,
409 handler: TypedHandler<AccountState>,
410 priority: Option<u8>,
411) {
412 get_message_bus()
413 .borrow_mut()
414 .router_account_state
415 .subscribe(pattern, handler, priority.unwrap_or(0));
416}
417
418pub fn subscribe_positions(
420 pattern: MStr<Pattern>,
421 handler: TypedHandler<Position>,
422 priority: Option<u8>,
423) {
424 get_message_bus().borrow_mut().router_positions.subscribe(
425 pattern,
426 handler,
427 priority.unwrap_or(0),
428 );
429}
430
431#[cfg(feature = "defi")]
433pub fn subscribe_defi_blocks(
434 pattern: MStr<Pattern>,
435 handler: TypedHandler<Block>,
436 priority: Option<u8>,
437) {
438 get_message_bus().borrow_mut().router_defi_blocks.subscribe(
439 pattern,
440 handler,
441 priority.unwrap_or(0),
442 );
443}
444
445#[cfg(feature = "defi")]
447pub fn subscribe_defi_pools(
448 pattern: MStr<Pattern>,
449 handler: TypedHandler<Pool>,
450 priority: Option<u8>,
451) {
452 get_message_bus().borrow_mut().router_defi_pools.subscribe(
453 pattern,
454 handler,
455 priority.unwrap_or(0),
456 );
457}
458
459#[cfg(feature = "defi")]
461pub fn subscribe_defi_swaps(
462 pattern: MStr<Pattern>,
463 handler: TypedHandler<PoolSwap>,
464 priority: Option<u8>,
465) {
466 get_message_bus().borrow_mut().router_defi_swaps.subscribe(
467 pattern,
468 handler,
469 priority.unwrap_or(0),
470 );
471}
472
473#[cfg(feature = "defi")]
475pub fn subscribe_defi_liquidity(
476 pattern: MStr<Pattern>,
477 handler: TypedHandler<PoolLiquidityUpdate>,
478 priority: Option<u8>,
479) {
480 get_message_bus()
481 .borrow_mut()
482 .router_defi_liquidity
483 .subscribe(pattern, handler, priority.unwrap_or(0));
484}
485
486#[cfg(feature = "defi")]
488pub fn subscribe_defi_collects(
489 pattern: MStr<Pattern>,
490 handler: TypedHandler<PoolFeeCollect>,
491 priority: Option<u8>,
492) {
493 get_message_bus()
494 .borrow_mut()
495 .router_defi_collects
496 .subscribe(pattern, handler, priority.unwrap_or(0));
497}
498
499#[cfg(feature = "defi")]
501pub fn subscribe_defi_flash(
502 pattern: MStr<Pattern>,
503 handler: TypedHandler<PoolFlash>,
504 priority: Option<u8>,
505) {
506 get_message_bus().borrow_mut().router_defi_flash.subscribe(
507 pattern,
508 handler,
509 priority.unwrap_or(0),
510 );
511}
512
513pub fn unsubscribe_instruments(pattern: MStr<Pattern>, handler: ShareableMessageHandler) {
515 unsubscribe_any(pattern, handler);
516}
517
518pub fn unsubscribe_instrument_close(pattern: MStr<Pattern>, handler: ShareableMessageHandler) {
520 unsubscribe_any(pattern, handler);
521}
522
523pub fn unsubscribe_book_deltas(pattern: MStr<Pattern>, handler: &TypedHandler<OrderBookDeltas>) {
525 get_message_bus()
526 .borrow_mut()
527 .router_deltas
528 .unsubscribe(pattern, handler);
529}
530
531pub fn unsubscribe_book_depth10(pattern: MStr<Pattern>, handler: &TypedHandler<OrderBookDepth10>) {
533 get_message_bus()
534 .borrow_mut()
535 .router_depth10
536 .unsubscribe(pattern, handler);
537}
538
539pub fn unsubscribe_book_snapshots(pattern: MStr<Pattern>, handler: &TypedHandler<OrderBook>) {
541 get_message_bus()
542 .borrow_mut()
543 .router_book_snapshots
544 .unsubscribe(pattern, handler);
545}
546
547pub fn unsubscribe_quotes(pattern: MStr<Pattern>, handler: &TypedHandler<QuoteTick>) {
549 get_message_bus()
550 .borrow_mut()
551 .router_quotes
552 .unsubscribe(pattern, handler);
553}
554
555pub fn unsubscribe_trades(pattern: MStr<Pattern>, handler: &TypedHandler<TradeTick>) {
557 get_message_bus()
558 .borrow_mut()
559 .router_trades
560 .unsubscribe(pattern, handler);
561}
562
563pub fn unsubscribe_bars(pattern: MStr<Pattern>, handler: &TypedHandler<Bar>) {
565 get_message_bus()
566 .borrow_mut()
567 .router_bars
568 .unsubscribe(pattern, handler);
569}
570
571pub fn unsubscribe_mark_prices(pattern: MStr<Pattern>, handler: &TypedHandler<MarkPriceUpdate>) {
573 get_message_bus()
574 .borrow_mut()
575 .router_mark_prices
576 .unsubscribe(pattern, handler);
577}
578
579pub fn unsubscribe_index_prices(pattern: MStr<Pattern>, handler: &TypedHandler<IndexPriceUpdate>) {
581 get_message_bus()
582 .borrow_mut()
583 .router_index_prices
584 .unsubscribe(pattern, handler);
585}
586
587pub fn unsubscribe_funding_rates(
589 pattern: MStr<Pattern>,
590 handler: &TypedHandler<FundingRateUpdate>,
591) {
592 get_message_bus()
593 .borrow_mut()
594 .router_funding_rates
595 .unsubscribe(pattern, handler);
596}
597
598pub fn unsubscribe_account_state(pattern: MStr<Pattern>, handler: &TypedHandler<AccountState>) {
600 get_message_bus()
601 .borrow_mut()
602 .router_account_state
603 .unsubscribe(pattern, handler);
604}
605
606pub fn unsubscribe_order_events(pattern: MStr<Pattern>, handler: &TypedHandler<OrderEventAny>) {
608 get_message_bus()
609 .borrow_mut()
610 .router_order_events
611 .unsubscribe(pattern, handler);
612}
613
614pub fn unsubscribe_position_events(pattern: MStr<Pattern>, handler: &TypedHandler<PositionEvent>) {
616 get_message_bus()
617 .borrow_mut()
618 .router_position_events
619 .unsubscribe(pattern, handler);
620}
621
622pub fn remove_order_event_handler(pattern: MStr<Pattern>, handler_id: Ustr) {
624 get_message_bus()
625 .borrow_mut()
626 .router_order_events
627 .remove_handler(pattern, handler_id);
628}
629
630pub fn remove_position_event_handler(pattern: MStr<Pattern>, handler_id: Ustr) {
632 get_message_bus()
633 .borrow_mut()
634 .router_position_events
635 .remove_handler(pattern, handler_id);
636}
637
638pub fn unsubscribe_orders(pattern: MStr<Pattern>, handler: &TypedHandler<OrderAny>) {
640 get_message_bus()
641 .borrow_mut()
642 .router_orders
643 .unsubscribe(pattern, handler);
644}
645
646pub fn unsubscribe_positions(pattern: MStr<Pattern>, handler: &TypedHandler<Position>) {
648 get_message_bus()
649 .borrow_mut()
650 .router_positions
651 .unsubscribe(pattern, handler);
652}
653
654pub fn unsubscribe_greeks(pattern: MStr<Pattern>, handler: &TypedHandler<GreeksData>) {
656 get_message_bus()
657 .borrow_mut()
658 .router_greeks
659 .unsubscribe(pattern, handler);
660}
661
662#[cfg(feature = "defi")]
664pub fn unsubscribe_defi_blocks(pattern: MStr<Pattern>, handler: &TypedHandler<Block>) {
665 get_message_bus()
666 .borrow_mut()
667 .router_defi_blocks
668 .unsubscribe(pattern, handler);
669}
670
671#[cfg(feature = "defi")]
673pub fn unsubscribe_defi_pools(pattern: MStr<Pattern>, handler: &TypedHandler<Pool>) {
674 get_message_bus()
675 .borrow_mut()
676 .router_defi_pools
677 .unsubscribe(pattern, handler);
678}
679
680#[cfg(feature = "defi")]
682pub fn unsubscribe_defi_swaps(pattern: MStr<Pattern>, handler: &TypedHandler<PoolSwap>) {
683 get_message_bus()
684 .borrow_mut()
685 .router_defi_swaps
686 .unsubscribe(pattern, handler);
687}
688
689#[cfg(feature = "defi")]
691pub fn unsubscribe_defi_liquidity(
692 pattern: MStr<Pattern>,
693 handler: &TypedHandler<PoolLiquidityUpdate>,
694) {
695 get_message_bus()
696 .borrow_mut()
697 .router_defi_liquidity
698 .unsubscribe(pattern, handler);
699}
700
701#[cfg(feature = "defi")]
703pub fn unsubscribe_defi_collects(pattern: MStr<Pattern>, handler: &TypedHandler<PoolFeeCollect>) {
704 get_message_bus()
705 .borrow_mut()
706 .router_defi_collects
707 .unsubscribe(pattern, handler);
708}
709
710#[cfg(feature = "defi")]
712pub fn unsubscribe_defi_flash(pattern: MStr<Pattern>, handler: &TypedHandler<PoolFlash>) {
713 get_message_bus()
714 .borrow_mut()
715 .router_defi_flash
716 .unsubscribe(pattern, handler);
717}
718
719pub fn unsubscribe_any(pattern: MStr<Pattern>, handler: ShareableMessageHandler) {
721 log::debug!("Unsubscribing {handler:?} from pattern '{pattern}'");
722
723 let handler_id = handler.0.id();
724 let bus_rc = get_message_bus();
725 let mut bus = bus_rc.borrow_mut();
726
727 let count_before = bus.subscriptions.len();
728
729 bus.topics.values_mut().for_each(|subs| {
730 subs.retain(|s| !(s.pattern == pattern && s.handler_id == handler_id));
731 });
732
733 bus.subscriptions
734 .retain(|s| !(s.pattern == pattern && s.handler_id == handler_id));
735
736 let removed = bus.subscriptions.len() < count_before;
737
738 if removed {
739 log::debug!("Handler for pattern '{pattern}' was removed");
740 } else {
741 log::debug!("No matching handler for pattern '{pattern}' was found");
742 }
743}
744
745pub fn is_subscribed_any<T: AsRef<str>>(pattern: T, handler: ShareableMessageHandler) -> bool {
747 let pattern = MStr::from(pattern.as_ref());
748 let sub = Subscription::new(pattern, handler, None);
749 get_message_bus().borrow().subscriptions.contains(&sub)
750}
751
752pub fn subscriptions_count_any<S: AsRef<str>>(topic: S) -> usize {
754 get_message_bus().borrow().subscriptions_count(topic)
755}
756
757pub fn subscriber_count_deltas(topic: MStr<Topic>) -> usize {
759 get_message_bus()
760 .borrow()
761 .router_deltas
762 .subscriber_count(topic)
763}
764
765pub fn subscriber_count_depth10(topic: MStr<Topic>) -> usize {
767 get_message_bus()
768 .borrow()
769 .router_depth10
770 .subscriber_count(topic)
771}
772
773pub fn subscriber_count_book_snapshots(topic: MStr<Topic>) -> usize {
775 get_message_bus()
776 .borrow()
777 .router_book_snapshots
778 .subscriber_count(topic)
779}
780
781pub fn exact_subscriber_count_bars(topic: MStr<Topic>) -> usize {
784 get_message_bus()
785 .borrow()
786 .router_bars
787 .exact_subscriber_count(topic)
788}
789
790pub fn publish_any(topic: MStr<Topic>, message: &dyn Any) {
792 let mut handlers = ANY_HANDLERS.with_borrow_mut(std::mem::take);
794
795 get_message_bus()
796 .borrow_mut()
797 .fill_matching_any_handlers(topic, &mut handlers);
798
799 for handler in &handlers {
800 handler.0.handle(message);
801 }
802
803 handlers.clear(); ANY_HANDLERS.with_borrow_mut(|buf| *buf = handlers);
805}
806
807pub fn publish_deltas(topic: MStr<Topic>, deltas: &OrderBookDeltas) {
809 publish_typed(
810 &DELTAS_HANDLERS,
811 |bus, h| bus.router_deltas.fill_matching_handlers(topic, h),
812 deltas,
813 );
814}
815
816pub fn publish_depth10(topic: MStr<Topic>, depth: &OrderBookDepth10) {
818 publish_typed(
819 &DEPTH10_HANDLERS,
820 |bus, h| bus.router_depth10.fill_matching_handlers(topic, h),
821 depth,
822 );
823}
824
825pub fn publish_book(topic: MStr<Topic>, book: &OrderBook) {
827 publish_typed(
828 &BOOK_HANDLERS,
829 |bus, h| bus.router_book_snapshots.fill_matching_handlers(topic, h),
830 book,
831 );
832}
833
834pub fn publish_quote(topic: MStr<Topic>, quote: &QuoteTick) {
836 publish_typed(
837 "E_HANDLERS,
838 |bus, h| bus.router_quotes.fill_matching_handlers(topic, h),
839 quote,
840 );
841}
842
843pub fn publish_trade(topic: MStr<Topic>, trade: &TradeTick) {
845 publish_typed(
846 &TRADE_HANDLERS,
847 |bus, h| bus.router_trades.fill_matching_handlers(topic, h),
848 trade,
849 );
850}
851
852pub fn publish_bar(topic: MStr<Topic>, bar: &Bar) {
854 publish_typed(
855 &BAR_HANDLERS,
856 |bus, h| bus.router_bars.fill_matching_handlers(topic, h),
857 bar,
858 );
859}
860
861pub fn publish_mark_price(topic: MStr<Topic>, mark_price: &MarkPriceUpdate) {
863 publish_typed(
864 &MARK_PRICE_HANDLERS,
865 |bus, h| bus.router_mark_prices.fill_matching_handlers(topic, h),
866 mark_price,
867 );
868}
869
870pub fn publish_index_price(topic: MStr<Topic>, index_price: &IndexPriceUpdate) {
872 publish_typed(
873 &INDEX_PRICE_HANDLERS,
874 |bus, h| bus.router_index_prices.fill_matching_handlers(topic, h),
875 index_price,
876 );
877}
878
879pub fn publish_funding_rate(topic: MStr<Topic>, funding_rate: &FundingRateUpdate) {
881 publish_typed(
882 &FUNDING_RATE_HANDLERS,
883 |bus, h| bus.router_funding_rates.fill_matching_handlers(topic, h),
884 funding_rate,
885 );
886}
887
888pub fn publish_greeks(topic: MStr<Topic>, greeks: &GreeksData) {
890 publish_typed(
891 &GREEKS_HANDLERS,
892 |bus, h| bus.router_greeks.fill_matching_handlers(topic, h),
893 greeks,
894 );
895}
896
897pub fn publish_account_state(topic: MStr<Topic>, state: &AccountState) {
899 publish_typed(
900 &ACCOUNT_STATE_HANDLERS,
901 |bus, h| bus.router_account_state.fill_matching_handlers(topic, h),
902 state,
903 );
904}
905
906pub fn publish_order_event(topic: MStr<Topic>, event: &OrderEventAny) {
908 publish_typed(
909 &ORDER_EVENT_HANDLERS,
910 |bus, h| bus.router_order_events.fill_matching_handlers(topic, h),
911 event,
912 );
913}
914
915pub fn publish_position_event(topic: MStr<Topic>, event: &PositionEvent) {
917 publish_typed(
918 &POSITION_EVENT_HANDLERS,
919 |bus, h| bus.router_position_events.fill_matching_handlers(topic, h),
920 event,
921 );
922}
923
924#[cfg(feature = "defi")]
926pub fn publish_defi_block(topic: MStr<Topic>, block: &Block) {
927 publish_typed(
928 &DEFI_BLOCK_HANDLERS,
929 |bus, h| bus.router_defi_blocks.fill_matching_handlers(topic, h),
930 block,
931 );
932}
933
934#[cfg(feature = "defi")]
936pub fn publish_defi_pool(topic: MStr<Topic>, pool: &Pool) {
937 publish_typed(
938 &DEFI_POOL_HANDLERS,
939 |bus, h| bus.router_defi_pools.fill_matching_handlers(topic, h),
940 pool,
941 );
942}
943
944#[cfg(feature = "defi")]
946pub fn publish_defi_swap(topic: MStr<Topic>, swap: &PoolSwap) {
947 publish_typed(
948 &DEFI_SWAP_HANDLERS,
949 |bus, h| bus.router_defi_swaps.fill_matching_handlers(topic, h),
950 swap,
951 );
952}
953
954#[cfg(feature = "defi")]
956pub fn publish_defi_liquidity(topic: MStr<Topic>, update: &PoolLiquidityUpdate) {
957 publish_typed(
958 &DEFI_LIQUIDITY_HANDLERS,
959 |bus, h| bus.router_defi_liquidity.fill_matching_handlers(topic, h),
960 update,
961 );
962}
963
964#[cfg(feature = "defi")]
966pub fn publish_defi_collect(topic: MStr<Topic>, collect: &PoolFeeCollect) {
967 publish_typed(
968 &DEFI_COLLECT_HANDLERS,
969 |bus, h| bus.router_defi_collects.fill_matching_handlers(topic, h),
970 collect,
971 );
972}
973
974#[cfg(feature = "defi")]
976pub fn publish_defi_flash(topic: MStr<Topic>, flash: &PoolFlash) {
977 publish_typed(
978 &DEFI_FLASH_HANDLERS,
979 |bus, h| bus.router_defi_flash.fill_matching_handlers(topic, h),
980 flash,
981 );
982}
983
984#[inline]
994fn publish_typed<T: 'static>(
995 tls: &'static LocalKey<RefCell<SmallVec<[TypedHandler<T>; HANDLER_BUFFER_CAP]>>>,
996 fill_fn: impl FnOnce(&mut MessageBus, &mut SmallVec<[TypedHandler<T>; HANDLER_BUFFER_CAP]>),
997 message: &T,
998) {
999 let mut handlers = tls.with_borrow_mut(std::mem::take);
1001
1002 MESSAGE_BUS.with(|cell| {
1004 let rc = cell.get_or_init(|| Rc::new(RefCell::new(MessageBus::default())));
1005 fill_fn(&mut rc.borrow_mut(), &mut handlers);
1006 });
1007
1008 for handler in &handlers {
1009 handler.handle(message);
1010 }
1011
1012 handlers.clear(); tls.with_borrow_mut(|buf| *buf = handlers);
1014}
1015
1016pub fn send_any(endpoint: MStr<Endpoint>, message: &dyn Any) {
1018 let handler = get_message_bus().borrow().get_endpoint(endpoint).cloned();
1019
1020 if let Some(handler) = handler {
1021 handler.0.handle(message);
1022 } else {
1023 log::error!("send_any: no registered endpoint '{endpoint}'");
1024 }
1025}
1026
1027pub fn send_any_value<T: 'static>(endpoint: MStr<Endpoint>, message: T) {
1029 let handler = get_message_bus().borrow().get_endpoint(endpoint).cloned();
1030
1031 if let Some(handler) = handler {
1032 handler.0.handle(&message);
1033 } else {
1034 log::error!("send_any_value: no registered endpoint '{endpoint}'");
1035 }
1036}
1037
1038pub fn send_response(correlation_id: &UUID4, message: DataResponse) {
1040 let handler = get_message_bus()
1041 .borrow()
1042 .get_response_handler(correlation_id)
1043 .cloned();
1044
1045 if let Some(handler) = handler {
1046 match &message {
1047 DataResponse::Data(resp) => handler.0.handle(resp),
1048 DataResponse::Instrument(resp) => handler.0.handle(resp.as_ref()),
1049 DataResponse::Instruments(resp) => handler.0.handle(resp),
1050 DataResponse::Book(resp) => handler.0.handle(resp),
1051 DataResponse::Quotes(resp) => handler.0.handle(resp),
1052 DataResponse::Trades(resp) => handler.0.handle(resp),
1053 DataResponse::FundingRates(resp) => handler.0.handle(resp),
1054 DataResponse::Bars(resp) => handler.0.handle(resp),
1055 }
1056 } else {
1057 log::error!("send_response: handler not found for correlation_id '{correlation_id}'");
1058 }
1059}
1060
1061pub fn send_quote(endpoint: MStr<Endpoint>, quote: &QuoteTick) {
1063 send_endpoint_ref(
1064 endpoint,
1065 quote,
1066 |bus| bus.endpoints_quotes.get(endpoint),
1067 "send_quote",
1068 );
1069}
1070
1071pub fn send_trade(endpoint: MStr<Endpoint>, trade: &TradeTick) {
1073 send_endpoint_ref(
1074 endpoint,
1075 trade,
1076 |bus| bus.endpoints_trades.get(endpoint),
1077 "send_trade",
1078 );
1079}
1080
1081pub fn send_bar(endpoint: MStr<Endpoint>, bar: &Bar) {
1083 send_endpoint_ref(
1084 endpoint,
1085 bar,
1086 |bus| bus.endpoints_bars.get(endpoint),
1087 "send_bar",
1088 );
1089}
1090
1091pub fn send_order_event(endpoint: MStr<Endpoint>, event: OrderEventAny) {
1093 send_endpoint_owned(
1094 endpoint,
1095 event,
1096 |bus| bus.endpoints_order_events.get(endpoint),
1097 "send_order_event",
1098 );
1099}
1100
1101pub fn send_account_state(endpoint: MStr<Endpoint>, state: &AccountState) {
1103 send_endpoint_ref(
1104 endpoint,
1105 state,
1106 |bus| bus.endpoints_account_state.get(endpoint),
1107 "send_account_state",
1108 );
1109}
1110
1111pub fn send_trading_command(endpoint: MStr<Endpoint>, command: TradingCommand) {
1113 send_endpoint_owned(
1114 endpoint,
1115 command,
1116 |bus| bus.endpoints_trading_commands.get(endpoint),
1117 "send_trading_command",
1118 );
1119}
1120
1121pub fn send_data_command(endpoint: MStr<Endpoint>, command: DataCommand) {
1123 send_endpoint_owned(
1124 endpoint,
1125 command,
1126 |bus| bus.endpoints_data_commands.get(endpoint),
1127 "send_data_command",
1128 );
1129}
1130
1131pub fn send_data_response(endpoint: MStr<Endpoint>, response: DataResponse) {
1133 send_endpoint_owned(
1134 endpoint,
1135 response,
1136 |bus| bus.endpoints_data_responses.get(endpoint),
1137 "send_data_response",
1138 );
1139}
1140
1141pub fn send_execution_report(endpoint: MStr<Endpoint>, report: ExecutionReport) {
1143 send_endpoint_owned(
1144 endpoint,
1145 report,
1146 |bus| bus.endpoints_exec_reports.get(endpoint),
1147 "send_execution_report",
1148 );
1149}
1150
1151pub fn send_data(endpoint: MStr<Endpoint>, data: Data) {
1153 send_endpoint_owned(
1154 endpoint,
1155 data,
1156 |bus| bus.endpoints_data.get(endpoint),
1157 "send_data",
1158 );
1159}
1160
1161#[cfg(feature = "defi")]
1163pub fn send_defi_data(endpoint: MStr<Endpoint>, data: DefiData) {
1164 send_endpoint_owned(
1165 endpoint,
1166 data,
1167 |bus| bus.endpoints_defi_data.get(endpoint),
1168 "send_defi_data",
1169 );
1170}
1171
1172#[inline]
1173fn send_endpoint_ref<T: 'static, F>(
1174 endpoint: MStr<Endpoint>,
1175 message: &T,
1176 get_handler: F,
1177 fn_name: &str,
1178) where
1179 F: FnOnce(&MessageBus) -> Option<&TypedHandler<T>>,
1180{
1181 let handler = {
1182 let bus = get_message_bus();
1183 get_handler(&bus.borrow()).cloned()
1184 };
1185
1186 if let Some(handler) = handler {
1187 handler.handle(message);
1188 } else {
1189 log::error!("{fn_name}: no registered endpoint '{endpoint}'");
1190 }
1191}
1192
1193#[inline]
1194fn send_endpoint_owned<T: 'static, F>(
1195 endpoint: MStr<Endpoint>,
1196 message: T,
1197 get_handler: F,
1198 fn_name: &str,
1199) where
1200 F: FnOnce(&MessageBus) -> Option<&TypedIntoHandler<T>>,
1201{
1202 let handler = {
1203 let bus = get_message_bus();
1204 get_handler(&bus.borrow()).cloned()
1205 };
1206
1207 if let Some(handler) = handler {
1208 handler.handle(message);
1209 } else {
1210 log::error!("{fn_name}: no registered endpoint '{endpoint}'");
1211 }
1212}
1213
1214#[cfg(test)]
1215mod tests {
1216 use std::{cell::RefCell, rc::Rc};
1224
1225 use nautilus_core::UUID4;
1226 use nautilus_model::{
1227 data::{Bar, OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick},
1228 enums::OrderSide,
1229 events::OrderDenied,
1230 identifiers::{ClientId, ClientOrderId, InstrumentId, StrategyId, TraderId},
1231 };
1232 use rstest::rstest;
1233
1234 use super::*;
1235 use crate::messages::{
1236 data::{DataCommand, SubscribeCommand, SubscribeQuotes},
1237 execution::{CancelAllOrders, TradingCommand},
1238 };
1239
1240 #[rstest]
1241 fn test_typed_quote_publish_subscribe_integration() {
1242 let _msgbus = get_message_bus();
1243 let received = Rc::new(RefCell::new(Vec::new()));
1244 let received_clone = received.clone();
1245
1246 let handler = TypedHandler::from(move |quote: &QuoteTick| {
1247 received_clone.borrow_mut().push(*quote);
1248 });
1249
1250 subscribe_quotes("data.quotes.*".into(), handler, None);
1251
1252 let quote = QuoteTick::default();
1253 publish_quote("data.quotes.TEST".into(), "e);
1254 publish_quote("data.quotes.TEST".into(), "e);
1255
1256 assert_eq!(received.borrow().len(), 2);
1257 }
1258
1259 #[rstest]
1260 fn test_typed_trade_publish_subscribe_integration() {
1261 let _msgbus = get_message_bus();
1262 let received = Rc::new(RefCell::new(Vec::new()));
1263 let received_clone = received.clone();
1264
1265 let handler = TypedHandler::from(move |trade: &TradeTick| {
1266 received_clone.borrow_mut().push(*trade);
1267 });
1268
1269 subscribe_trades("data.trades.*".into(), handler, None);
1270
1271 let trade = TradeTick::default();
1272 publish_trade("data.trades.TEST".into(), &trade);
1273
1274 assert_eq!(received.borrow().len(), 1);
1275 }
1276
1277 #[rstest]
1278 fn test_typed_bar_publish_subscribe_integration() {
1279 let _msgbus = get_message_bus();
1280 let received = Rc::new(RefCell::new(Vec::new()));
1281 let received_clone = received.clone();
1282
1283 let handler = TypedHandler::from(move |bar: &Bar| {
1284 received_clone.borrow_mut().push(*bar);
1285 });
1286
1287 subscribe_bars("data.bars.*".into(), handler, None);
1288
1289 let bar = Bar::default();
1290 publish_bar("data.bars.TEST".into(), &bar);
1291
1292 assert_eq!(received.borrow().len(), 1);
1293 }
1294
1295 #[rstest]
1296 fn test_typed_deltas_publish_subscribe_integration() {
1297 let _msgbus = get_message_bus();
1298 let received = Rc::new(RefCell::new(Vec::new()));
1299 let received_clone = received.clone();
1300
1301 let handler = TypedHandler::from(move |deltas: &OrderBookDeltas| {
1302 received_clone.borrow_mut().push(deltas.clone());
1303 });
1304
1305 subscribe_book_deltas("data.book.deltas.*".into(), handler, None);
1306
1307 let instrument_id = InstrumentId::from("TEST.VENUE");
1308 let delta = OrderBookDelta::clear(instrument_id, 0, 1.into(), 2.into());
1309 let deltas = OrderBookDeltas::new(instrument_id, vec![delta]);
1310 publish_deltas("data.book.deltas.TEST".into(), &deltas);
1311
1312 assert_eq!(received.borrow().len(), 1);
1313 }
1314
1315 #[rstest]
1316 fn test_typed_unsubscribe_stops_delivery() {
1317 let _msgbus = get_message_bus();
1318 let received = Rc::new(RefCell::new(Vec::new()));
1319 let received_clone = received.clone();
1320
1321 let handler = TypedHandler::from_with_id("unsub-test", move |quote: &QuoteTick| {
1322 received_clone.borrow_mut().push(*quote);
1323 });
1324
1325 subscribe_quotes("data.quotes.UNSUB".into(), handler.clone(), None);
1326
1327 let quote = QuoteTick::default();
1328 publish_quote("data.quotes.UNSUB".into(), "e);
1329 assert_eq!(received.borrow().len(), 1);
1330
1331 unsubscribe_quotes("data.quotes.UNSUB".into(), &handler);
1332
1333 publish_quote("data.quotes.UNSUB".into(), "e);
1334 assert_eq!(received.borrow().len(), 1);
1335 }
1336
1337 #[rstest]
1338 fn test_typed_wildcard_pattern_matching() {
1339 let _msgbus = get_message_bus();
1340 let received = Rc::new(RefCell::new(Vec::new()));
1341 let received_clone = received.clone();
1342
1343 let handler = TypedHandler::from(move |quote: &QuoteTick| {
1344 received_clone.borrow_mut().push(*quote);
1345 });
1346
1347 subscribe_quotes("data.quotes.WILD.*".into(), handler, None);
1348
1349 let quote = QuoteTick::default();
1350 publish_quote("data.quotes.WILD.AAPL".into(), "e);
1351 publish_quote("data.quotes.WILD.MSFT".into(), "e);
1352 publish_quote("data.quotes.OTHER.AAPL".into(), "e);
1353
1354 assert_eq!(received.borrow().len(), 2);
1355 }
1356
1357 #[rstest]
1358 fn test_typed_priority_ordering() {
1359 let _msgbus = get_message_bus();
1360 let order = Rc::new(RefCell::new(Vec::new()));
1361
1362 let order1 = order.clone();
1363 let handler_low = TypedHandler::from_with_id("low-priority", move |_: &QuoteTick| {
1364 order1.borrow_mut().push("low");
1365 });
1366
1367 let order2 = order.clone();
1368 let handler_high = TypedHandler::from_with_id("high-priority", move |_: &QuoteTick| {
1369 order2.borrow_mut().push("high");
1370 });
1371
1372 subscribe_quotes("data.quotes.PRIO.*".into(), handler_low, Some(1));
1373 subscribe_quotes("data.quotes.PRIO.*".into(), handler_high, Some(10));
1374
1375 let quote = QuoteTick::default();
1376 publish_quote("data.quotes.PRIO.TEST".into(), "e);
1377
1378 assert_eq!(*order.borrow(), vec!["high", "low"]);
1379 }
1380
1381 #[rstest]
1382 fn test_typed_routing_isolation() {
1383 let _msgbus = get_message_bus();
1384 let quote_received = Rc::new(RefCell::new(false));
1385 let trade_received = Rc::new(RefCell::new(false));
1386
1387 let qr = quote_received.clone();
1388 let quote_handler = TypedHandler::from(move |_: &QuoteTick| {
1389 *qr.borrow_mut() = true;
1390 });
1391
1392 let tr = trade_received.clone();
1393 let trade_handler = TypedHandler::from(move |_: &TradeTick| {
1394 *tr.borrow_mut() = true;
1395 });
1396
1397 subscribe_quotes("data.iso.*".into(), quote_handler, None);
1398 subscribe_trades("data.iso.*".into(), trade_handler, None);
1399
1400 let quote = QuoteTick::default();
1401 publish_quote("data.iso.TEST".into(), "e);
1402
1403 assert!(*quote_received.borrow());
1404 assert!(!*trade_received.borrow());
1405 }
1406
1407 #[rstest]
1408 fn test_send_data_allows_reentrant_topic_access() {
1409 use crate::msgbus::switchboard::get_quotes_topic;
1410
1411 let _msgbus = get_message_bus();
1412 let topic_retrieved = Rc::new(RefCell::new(false));
1413 let topic_clone = topic_retrieved.clone();
1414
1415 let handler = TypedIntoHandler::from(move |data: Data| {
1416 let instrument_id = data.instrument_id();
1417 let _topic = get_quotes_topic(instrument_id);
1418 *topic_clone.borrow_mut() = true;
1419 });
1420
1421 let endpoint: MStr<Endpoint> = "ReentrantTest.data".into();
1422 register_data_endpoint(endpoint, handler);
1423
1424 let quote = QuoteTick::default();
1425 send_data(endpoint, Data::Quote(quote));
1426
1427 assert!(*topic_retrieved.borrow());
1428 }
1429
1430 #[rstest]
1431 fn test_send_trading_command_allows_reentrant_topic_access() {
1432 use nautilus_model::{
1433 enums::OrderSide,
1434 identifiers::{StrategyId, TraderId},
1435 };
1436
1437 use crate::{
1438 messages::execution::{TradingCommand, cancel::CancelAllOrders},
1439 msgbus::switchboard::get_trades_topic,
1440 };
1441
1442 let _msgbus = get_message_bus();
1443 let topic_retrieved = Rc::new(RefCell::new(false));
1444 let topic_clone = topic_retrieved.clone();
1445
1446 let handler = TypedIntoHandler::from(move |cmd: TradingCommand| {
1447 let instrument_id = cmd.instrument_id();
1448 let _topic = get_trades_topic(instrument_id);
1449 *topic_clone.borrow_mut() = true;
1450 });
1451
1452 let endpoint: MStr<Endpoint> = "ReentrantTest.tradingCmd".into();
1453 register_trading_command_endpoint(endpoint, handler);
1454
1455 let cmd = TradingCommand::CancelAllOrders(CancelAllOrders::new(
1456 TraderId::new("TESTER-001"),
1457 None,
1458 StrategyId::new("S-001"),
1459 InstrumentId::from("TEST.VENUE"),
1460 OrderSide::NoOrderSide,
1461 UUID4::new(),
1462 0.into(),
1463 None,
1464 ));
1465 send_trading_command(endpoint, cmd);
1466
1467 assert!(*topic_retrieved.borrow());
1468 }
1469
1470 #[rstest]
1471 fn test_send_account_state_allows_reentrant_topic_access() {
1472 use nautilus_model::{enums::AccountType, identifiers::AccountId, types::Currency};
1473
1474 use crate::msgbus::switchboard::get_quotes_topic;
1475
1476 let _msgbus = get_message_bus();
1477 let topic_retrieved = Rc::new(RefCell::new(false));
1478 let topic_clone = topic_retrieved.clone();
1479
1480 let handler = TypedHandler::from(move |_state: &AccountState| {
1481 let instrument_id = InstrumentId::from("TEST.VENUE");
1482 let _topic = get_quotes_topic(instrument_id);
1483 *topic_clone.borrow_mut() = true;
1484 });
1485
1486 let endpoint: MStr<Endpoint> = "ReentrantTest.accountState".into();
1487 register_account_state_endpoint(endpoint, handler);
1488
1489 let state = AccountState::new(
1490 AccountId::new("SIM-001"),
1491 AccountType::Cash,
1492 vec![],
1493 vec![],
1494 true,
1495 UUID4::new(),
1496 0.into(),
1497 0.into(),
1498 Some(Currency::USD()),
1499 );
1500 send_account_state(endpoint, &state);
1501
1502 assert!(*topic_retrieved.borrow());
1503 }
1504
1505 #[rstest]
1506 fn test_send_order_event_allows_reentrant_topic_access() {
1507 use nautilus_model::{
1508 events::OrderDenied,
1509 identifiers::{ClientOrderId, StrategyId, TraderId},
1510 };
1511
1512 use crate::msgbus::switchboard::get_quotes_topic;
1513
1514 let _msgbus = get_message_bus();
1515 let topic_retrieved = Rc::new(RefCell::new(false));
1516 let topic_clone = topic_retrieved.clone();
1517
1518 let handler = TypedIntoHandler::from(move |_event: OrderEventAny| {
1519 let instrument_id = InstrumentId::from("TEST.VENUE");
1520 let _topic = get_quotes_topic(instrument_id);
1521 *topic_clone.borrow_mut() = true;
1522 });
1523
1524 let endpoint: MStr<Endpoint> = "ReentrantTest.orderEvent".into();
1525 register_order_event_endpoint(endpoint, handler);
1526
1527 let event = OrderEventAny::Denied(OrderDenied::new(
1528 TraderId::new("TESTER-001"),
1529 StrategyId::new("S-001"),
1530 InstrumentId::from("TEST.VENUE"),
1531 ClientOrderId::new("O-001"),
1532 "test denied".into(),
1533 UUID4::new(),
1534 0.into(),
1535 0.into(),
1536 ));
1537 send_order_event(endpoint, event);
1538
1539 assert!(*topic_retrieved.borrow());
1540 }
1541
1542 #[rstest]
1543 fn test_send_data_command_allows_reentrant_topic_access() {
1544 use nautilus_model::identifiers::ClientId;
1545
1546 use crate::{
1547 messages::data::{DataCommand, SubscribeCommand, SubscribeQuotes},
1548 msgbus::switchboard::get_trades_topic,
1549 };
1550
1551 let _msgbus = get_message_bus();
1552 let topic_retrieved = Rc::new(RefCell::new(false));
1553 let topic_clone = topic_retrieved.clone();
1554
1555 let handler = TypedIntoHandler::from(move |_cmd: DataCommand| {
1556 let _topic = get_trades_topic(InstrumentId::from("TEST.VENUE"));
1557 *topic_clone.borrow_mut() = true;
1558 });
1559
1560 let endpoint: MStr<Endpoint> = "ReentrantTest.dataCmd".into();
1561 register_data_command_endpoint(endpoint, handler);
1562
1563 let cmd = DataCommand::Subscribe(SubscribeCommand::Quotes(SubscribeQuotes::new(
1564 InstrumentId::from("TEST.VENUE"),
1565 Some(ClientId::new("SIM")),
1566 None,
1567 UUID4::new(),
1568 0.into(),
1569 None,
1570 None,
1571 )));
1572 send_data_command(endpoint, cmd);
1573
1574 assert!(*topic_retrieved.borrow());
1575 }
1576
1577 #[rstest]
1578 fn test_send_data_response_allows_reentrant_topic_access() {
1579 use nautilus_model::identifiers::ClientId;
1580
1581 use crate::{
1582 messages::data::{DataResponse, QuotesResponse},
1583 msgbus::switchboard::get_quotes_topic,
1584 };
1585
1586 let _msgbus = get_message_bus();
1587 let topic_retrieved = Rc::new(RefCell::new(false));
1588 let topic_clone = topic_retrieved.clone();
1589
1590 let handler = TypedIntoHandler::from(move |_resp: DataResponse| {
1591 let _topic = get_quotes_topic(InstrumentId::from("TEST.VENUE"));
1592 *topic_clone.borrow_mut() = true;
1593 });
1594
1595 let endpoint: MStr<Endpoint> = "ReentrantTest.dataResp".into();
1596 register_data_response_endpoint(endpoint, handler);
1597
1598 let resp = DataResponse::Quotes(QuotesResponse {
1599 correlation_id: UUID4::new(),
1600 client_id: ClientId::new("SIM"),
1601 instrument_id: InstrumentId::from("TEST.VENUE"),
1602 data: vec![],
1603 start: None,
1604 end: None,
1605 ts_init: 0.into(),
1606 params: None,
1607 });
1608 send_data_response(endpoint, resp);
1609
1610 assert!(*topic_retrieved.borrow());
1611 }
1612
1613 #[rstest]
1614 fn test_send_execution_report_allows_reentrant_topic_access() {
1615 use nautilus_model::{
1616 identifiers::{AccountId, ClientId, Venue},
1617 reports::ExecutionMassStatus,
1618 };
1619
1620 use crate::{messages::execution::ExecutionReport, msgbus::switchboard::get_trades_topic};
1621
1622 let _msgbus = get_message_bus();
1623 let topic_retrieved = Rc::new(RefCell::new(false));
1624 let topic_clone = topic_retrieved.clone();
1625
1626 let handler = TypedIntoHandler::from(move |_report: ExecutionReport| {
1627 let _topic = get_trades_topic(InstrumentId::from("TEST.VENUE"));
1628 *topic_clone.borrow_mut() = true;
1629 });
1630
1631 let endpoint: MStr<Endpoint> = "ReentrantTest.execReport".into();
1632 register_execution_report_endpoint(endpoint, handler);
1633
1634 let report = ExecutionReport::MassStatus(Box::new(ExecutionMassStatus::new(
1635 ClientId::new("SIM"),
1636 AccountId::new("SIM-001"),
1637 Venue::new("TEST"),
1638 0.into(),
1639 None,
1640 )));
1641 send_execution_report(endpoint, report);
1642
1643 assert!(*topic_retrieved.borrow());
1644 }
1645
1646 #[rstest]
1647 fn test_order_event_handler_can_send_trading_command() {
1648 let _msgbus = get_message_bus();
1652 let command_sent = Rc::new(RefCell::new(false));
1653 let command_sent_clone = command_sent.clone();
1654
1655 let cmd_received = Rc::new(RefCell::new(false));
1656 let cmd_received_clone = cmd_received.clone();
1657 let cmd_handler = TypedIntoHandler::from(move |_cmd: TradingCommand| {
1658 *cmd_received_clone.borrow_mut() = true;
1659 });
1660 let cmd_endpoint: MStr<Endpoint> = "ReentrantTest.execCmd".into();
1661 register_trading_command_endpoint(cmd_endpoint, cmd_handler);
1662
1663 let event_handler = TypedIntoHandler::from(move |_event: OrderEventAny| {
1664 let command = TradingCommand::CancelAllOrders(CancelAllOrders::new(
1666 TraderId::new("TESTER-001"),
1667 None,
1668 StrategyId::new("S-001"),
1669 InstrumentId::from("TEST.VENUE"),
1670 OrderSide::Buy,
1671 UUID4::new(),
1672 0.into(),
1673 None,
1674 ));
1675 send_trading_command(cmd_endpoint, command);
1676 *command_sent_clone.borrow_mut() = true;
1677 });
1678
1679 let event_endpoint: MStr<Endpoint> = "ReentrantTest.orderEvt".into();
1680 register_order_event_endpoint(event_endpoint, event_handler);
1681
1682 let event = OrderEventAny::Denied(OrderDenied::new(
1683 TraderId::new("TESTER-001"),
1684 StrategyId::new("S-001"),
1685 InstrumentId::from("TEST.VENUE"),
1686 ClientOrderId::new("O-001"),
1687 "Test denial".into(),
1688 UUID4::new(),
1689 0.into(),
1690 0.into(),
1691 ));
1692 send_order_event(event_endpoint, event);
1693
1694 assert!(
1695 *command_sent.borrow(),
1696 "Order event handler should have run"
1697 );
1698 assert!(
1699 *cmd_received.borrow(),
1700 "Trading command should have been received"
1701 );
1702 }
1703
1704 #[rstest]
1705 fn test_data_handler_can_send_data_command() {
1706 let _msgbus = get_message_bus();
1709 let command_sent = Rc::new(RefCell::new(false));
1710 let command_sent_clone = command_sent.clone();
1711
1712 let cmd_received = Rc::new(RefCell::new(false));
1713 let cmd_received_clone = cmd_received.clone();
1714 let cmd_handler = TypedIntoHandler::from(move |_cmd: DataCommand| {
1715 *cmd_received_clone.borrow_mut() = true;
1716 });
1717 let cmd_endpoint: MStr<Endpoint> = "ReentrantTest.dataCmd2".into();
1718 register_data_command_endpoint(cmd_endpoint, cmd_handler);
1719
1720 let data_handler = TypedIntoHandler::from(move |_data: Data| {
1721 let command = DataCommand::Subscribe(SubscribeCommand::Quotes(SubscribeQuotes::new(
1722 InstrumentId::from("TEST.VENUE"),
1723 Some(ClientId::new("SIM")),
1724 None,
1725 UUID4::new(),
1726 0.into(),
1727 None,
1728 None,
1729 )));
1730 send_data_command(cmd_endpoint, command);
1731 *command_sent_clone.borrow_mut() = true;
1732 });
1733
1734 let data_endpoint: MStr<Endpoint> = "ReentrantTest.data2".into();
1735 register_data_endpoint(data_endpoint, data_handler);
1736
1737 let quote = QuoteTick::default();
1738 send_data(data_endpoint, Data::Quote(quote));
1739
1740 assert!(*command_sent.borrow(), "Data handler should have run");
1741 assert!(
1742 *cmd_received.borrow(),
1743 "Data command should have been received"
1744 );
1745 }
1746
1747 #[rstest]
1748 fn test_trading_command_handler_can_send_order_event() {
1749 let _msgbus = get_message_bus();
1753 let event_sent = Rc::new(RefCell::new(false));
1754 let event_sent_clone = event_sent.clone();
1755
1756 let evt_received = Rc::new(RefCell::new(false));
1757 let evt_received_clone = evt_received.clone();
1758 let evt_handler = TypedIntoHandler::from(move |_event: OrderEventAny| {
1759 *evt_received_clone.borrow_mut() = true;
1760 });
1761 let evt_endpoint: MStr<Endpoint> = "ReentrantTest.orderEvt2".into();
1762 register_order_event_endpoint(evt_endpoint, evt_handler);
1763
1764 let cmd_handler = TypedIntoHandler::from(move |_cmd: TradingCommand| {
1765 let event = OrderEventAny::Denied(OrderDenied::new(
1766 TraderId::new("TESTER-001"),
1767 StrategyId::new("S-001"),
1768 InstrumentId::from("TEST.VENUE"),
1769 ClientOrderId::new("O-001"),
1770 "Test denial".into(),
1771 UUID4::new(),
1772 0.into(),
1773 0.into(),
1774 ));
1775 send_order_event(evt_endpoint, event);
1776 *event_sent_clone.borrow_mut() = true;
1777 });
1778
1779 let cmd_endpoint: MStr<Endpoint> = "ReentrantTest.execCmd2".into();
1780 register_trading_command_endpoint(cmd_endpoint, cmd_handler);
1781
1782 let command = TradingCommand::CancelAllOrders(CancelAllOrders::new(
1783 TraderId::new("TESTER-001"),
1784 None,
1785 StrategyId::new("S-001"),
1786 InstrumentId::from("TEST.VENUE"),
1787 OrderSide::Buy,
1788 UUID4::new(),
1789 0.into(),
1790 None,
1791 ));
1792 send_trading_command(cmd_endpoint, command);
1793
1794 assert!(
1795 *event_sent.borrow(),
1796 "Trading command handler should have run"
1797 );
1798 assert!(
1799 *evt_received.borrow(),
1800 "Order event should have been received"
1801 );
1802 }
1803
1804 #[rstest]
1805 fn test_nested_reentrant_calls() {
1806 let _msgbus = get_message_bus();
1809 let call_depth = Rc::new(RefCell::new(0u32));
1810
1811 let final_received = Rc::new(RefCell::new(false));
1812 let final_received_clone = final_received.clone();
1813 let final_evt_handler = TypedIntoHandler::from(move |_event: OrderEventAny| {
1814 *final_received_clone.borrow_mut() = true;
1815 });
1816 let final_evt_endpoint: MStr<Endpoint> = "ReentrantTest.finalEvt".into();
1817 register_order_event_endpoint(final_evt_endpoint, final_evt_handler);
1818
1819 let call_depth_clone2 = call_depth.clone();
1820 let mid_cmd_handler = TypedIntoHandler::from(move |_cmd: TradingCommand| {
1821 *call_depth_clone2.borrow_mut() += 1;
1822 let event = OrderEventAny::Denied(OrderDenied::new(
1823 TraderId::new("TESTER-001"),
1824 StrategyId::new("S-001"),
1825 InstrumentId::from("TEST.VENUE"),
1826 ClientOrderId::new("O-002"),
1827 "Nested denial".into(),
1828 UUID4::new(),
1829 0.into(),
1830 0.into(),
1831 ));
1832 send_order_event(final_evt_endpoint, event);
1833 });
1834 let mid_cmd_endpoint: MStr<Endpoint> = "ReentrantTest.midCmd".into();
1835 register_trading_command_endpoint(mid_cmd_endpoint, mid_cmd_handler);
1836
1837 let call_depth_clone1 = call_depth.clone();
1838 let init_evt_handler = TypedIntoHandler::from(move |_event: OrderEventAny| {
1839 *call_depth_clone1.borrow_mut() += 1;
1840 let command = TradingCommand::CancelAllOrders(CancelAllOrders::new(
1841 TraderId::new("TESTER-001"),
1842 None,
1843 StrategyId::new("S-001"),
1844 InstrumentId::from("TEST.VENUE"),
1845 OrderSide::Buy,
1846 UUID4::new(),
1847 0.into(),
1848 None,
1849 ));
1850 send_trading_command(mid_cmd_endpoint, command);
1851 });
1852 let init_evt_endpoint: MStr<Endpoint> = "ReentrantTest.initEvt".into();
1853 register_order_event_endpoint(init_evt_endpoint, init_evt_handler);
1854
1855 let event = OrderEventAny::Denied(OrderDenied::new(
1856 TraderId::new("TESTER-001"),
1857 StrategyId::new("S-001"),
1858 InstrumentId::from("TEST.VENUE"),
1859 ClientOrderId::new("O-001"),
1860 "Initial denial".into(),
1861 UUID4::new(),
1862 0.into(),
1863 0.into(),
1864 ));
1865 send_order_event(init_evt_endpoint, event);
1866
1867 assert_eq!(
1868 *call_depth.borrow(),
1869 2,
1870 "Both intermediate handlers should have run"
1871 );
1872 assert!(
1873 *final_received.borrow(),
1874 "Final event handler should have received the event"
1875 );
1876 }
1877}