nautilus_common/msgbus/
stubs.rs1use std::{
17 any::Any,
18 cell::RefCell,
19 fmt::Debug,
20 rc::Rc,
21 sync::{
22 Arc,
23 atomic::{AtomicBool, Ordering},
24 },
25};
26
27use nautilus_core::{UUID4, message::Message};
28use ustr::Ustr;
29
30use crate::msgbus::{ShareableMessageHandler, handler::MessageHandler};
31
32pub struct StubMessageHandler {
34 id: Ustr,
35 callback: Arc<dyn Fn(Message) + Send>,
36}
37
38impl Debug for StubMessageHandler {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 f.debug_struct(stringify!(StubMessageHandler))
41 .field("id", &self.id)
42 .finish()
43 }
44}
45
46impl MessageHandler for StubMessageHandler {
47 fn id(&self) -> Ustr {
48 self.id
49 }
50
51 fn handle(&self, message: &dyn Any) {
52 (self.callback)(message.downcast_ref::<Message>().unwrap().clone());
53 }
54
55 fn as_any(&self) -> &dyn Any {
56 self
57 }
58}
59
60#[must_use]
61#[allow(unused_must_use)]
62pub fn get_stub_shareable_handler(id: Option<Ustr>) -> ShareableMessageHandler {
63 let unique_id = id.unwrap_or_else(|| Ustr::from(UUID4::new().as_str()));
67 ShareableMessageHandler(Rc::new(StubMessageHandler {
68 id: unique_id,
69 callback: Arc::new(|m: Message| {
70 format!("{m:?}");
71 }),
72 }))
73}
74
75#[derive(Debug)]
77pub struct CallCheckMessageHandler {
78 id: Ustr,
79 called: Arc<AtomicBool>,
80}
81
82impl CallCheckMessageHandler {
83 #[must_use]
84 pub fn was_called(&self) -> bool {
85 self.called.load(Ordering::SeqCst)
86 }
87}
88
89impl MessageHandler for CallCheckMessageHandler {
90 fn id(&self) -> Ustr {
91 self.id
92 }
93
94 fn handle(&self, _message: &dyn Any) {
95 self.called.store(true, Ordering::SeqCst);
96 }
97
98 fn as_any(&self) -> &dyn Any {
99 self
100 }
101}
102
103#[must_use]
104pub fn get_call_check_shareable_handler(id: Option<Ustr>) -> ShareableMessageHandler {
105 let unique_id = id.unwrap_or_else(|| Ustr::from(UUID4::new().as_str()));
109 ShareableMessageHandler(Rc::new(CallCheckMessageHandler {
110 id: unique_id,
111 called: Arc::new(AtomicBool::new(false)),
112 }))
113}
114
115#[must_use]
121pub fn check_handler_was_called(call_check_handler: ShareableMessageHandler) -> bool {
122 call_check_handler
123 .0
124 .as_ref()
125 .as_any()
126 .downcast_ref::<CallCheckMessageHandler>()
127 .unwrap()
128 .was_called()
129}
130
131#[derive(Debug, Clone)]
133pub struct MessageSavingHandler<T> {
134 id: Ustr,
135 messages: Rc<RefCell<Vec<T>>>,
136}
137
138impl<T: Clone + 'static> MessageSavingHandler<T> {
139 #[must_use]
140 pub fn get_messages(&self) -> Vec<T> {
141 self.messages.borrow().clone()
142 }
143}
144
145impl<T: Clone + 'static> MessageHandler for MessageSavingHandler<T> {
146 fn id(&self) -> Ustr {
147 self.id
148 }
149
150 fn handle(&self, message: &dyn Any) {
156 let mut messages = self.messages.borrow_mut();
157 match message.downcast_ref::<T>() {
158 Some(m) => messages.push(m.clone()),
159 None => panic!("MessageSavingHandler: message type mismatch {message:?}"),
160 }
161 }
162
163 fn as_any(&self) -> &dyn Any {
164 self
165 }
166}
167
168#[must_use]
169pub fn get_message_saving_handler<T: Clone + 'static>(id: Option<Ustr>) -> ShareableMessageHandler {
170 let unique_id = id.unwrap_or_else(|| Ustr::from(UUID4::new().as_str()));
174 ShareableMessageHandler(Rc::new(MessageSavingHandler::<T> {
175 id: unique_id,
176 messages: Rc::new(RefCell::new(Vec::new())),
177 }))
178}
179
180#[must_use]
186pub fn get_saved_messages<T: Clone + 'static>(handler: ShareableMessageHandler) -> Vec<T> {
187 handler
188 .0
189 .as_ref()
190 .as_any()
191 .downcast_ref::<MessageSavingHandler<T>>()
192 .unwrap()
193 .get_messages()
194}
195
196pub fn clear_saved_messages<T: Clone + 'static>(handler: ShareableMessageHandler) {
202 handler
203 .0
204 .as_ref()
205 .as_any()
206 .downcast_ref::<MessageSavingHandler<T>>()
207 .unwrap()
208 .messages
209 .borrow_mut()
210 .clear();
211}