nautilus_common/msgbus/
stubs.rs1use std::{
17 any::Any,
18 cell::RefCell,
19 rc::Rc,
20 sync::{
21 atomic::{AtomicBool, Ordering},
22 Arc,
23 },
24};
25
26use nautilus_core::message::Message;
27use nautilus_model::data::Data;
28use ustr::Ustr;
29use uuid::Uuid;
30
31use crate::{
32 messages::data::DataResponse,
33 msgbus::{handler::MessageHandler, ShareableMessageHandler},
34};
35
36pub struct StubMessageHandler {
38 id: Ustr,
39 callback: Arc<dyn Fn(Message) + Send>,
40}
41
42impl MessageHandler for StubMessageHandler {
43 fn id(&self) -> Ustr {
44 self.id
45 }
46
47 fn handle(&self, message: &dyn Any) {
48 (self.callback)(message.downcast_ref::<Message>().unwrap().clone());
49 }
50
51 fn handle_response(&self, _resp: DataResponse) {}
52
53 fn handle_data(&self, _data: Data) {}
54
55 fn as_any(&self) -> &dyn Any {
56 self
57 }
58}
59
60#[must_use]
61#[allow(unused_must_use)] pub fn get_stub_shareable_handler(id: Option<Ustr>) -> ShareableMessageHandler {
63 let unique_id = id.unwrap_or_else(|| Ustr::from(&Uuid::new_v4().to_string()));
67 ShareableMessageHandler(Rc::new(StubMessageHandler {
68 id: unique_id,
69 callback: Arc::new(|m: Message| {
70 format!("{m:?}");
71 }),
72 }))
73}
74
75pub struct CallCheckMessageHandler {
77 id: Ustr,
78 called: Arc<AtomicBool>,
79}
80
81impl CallCheckMessageHandler {
82 #[must_use]
83 pub fn was_called(&self) -> bool {
84 self.called.load(Ordering::SeqCst)
85 }
86}
87
88impl MessageHandler for CallCheckMessageHandler {
89 fn id(&self) -> Ustr {
90 self.id
91 }
92
93 fn handle(&self, _message: &dyn Any) {
94 self.called.store(true, Ordering::SeqCst);
95 }
96
97 fn handle_response(&self, _resp: DataResponse) {}
98
99 fn handle_data(&self, _data: Data) {}
100
101 fn as_any(&self) -> &dyn Any {
102 self
103 }
104}
105
106#[must_use]
107pub fn get_call_check_shareable_handler(id: Option<Ustr>) -> ShareableMessageHandler {
108 let unique_id = id.unwrap_or_else(|| Ustr::from(&Uuid::new_v4().to_string()));
112 ShareableMessageHandler(Rc::new(CallCheckMessageHandler {
113 id: unique_id,
114 called: Arc::new(AtomicBool::new(false)),
115 }))
116}
117
118#[must_use]
119pub fn check_handler_was_called(call_check_handler: ShareableMessageHandler) -> bool {
120 call_check_handler
121 .0
122 .as_ref()
123 .as_any()
124 .downcast_ref::<CallCheckMessageHandler>()
125 .unwrap()
126 .was_called()
127}
128
129#[derive(Debug, Clone)]
131pub struct MessageSavingHandler<T> {
132 id: Ustr,
133 messages: Rc<RefCell<Vec<T>>>,
134}
135
136impl<T: Clone + 'static> MessageSavingHandler<T> {
137 #[must_use]
138 pub fn get_messages(&self) -> Vec<T> {
139 self.messages.borrow().clone()
140 }
141}
142
143impl<T: Clone + 'static> MessageHandler for MessageSavingHandler<T> {
144 fn id(&self) -> Ustr {
145 self.id
146 }
147
148 fn handle(&self, message: &dyn Any) {
149 let mut messages = self.messages.borrow_mut();
150 match message.downcast_ref::<T>() {
151 Some(m) => messages.push(m.clone()),
152 None => panic!("MessageSavingHandler: message type mismatch {message:?}"),
153 }
154 }
155
156 fn handle_response(&self, _resp: DataResponse) {}
157
158 fn handle_data(&self, _data: Data) {}
159
160 fn as_any(&self) -> &dyn Any {
161 self
162 }
163}
164
165#[must_use]
166pub fn get_message_saving_handler<T: Clone + 'static>(id: Option<Ustr>) -> ShareableMessageHandler {
167 let unique_id = id.unwrap_or_else(|| Ustr::from(&Uuid::new_v4().to_string()));
171 ShareableMessageHandler(Rc::new(MessageSavingHandler::<T> {
172 id: unique_id,
173 messages: Rc::new(RefCell::new(Vec::new())),
174 }))
175}
176
177#[must_use]
178pub fn get_saved_messages<T: Clone + 'static>(handler: ShareableMessageHandler) -> Vec<T> {
179 handler
180 .0
181 .as_ref()
182 .as_any()
183 .downcast_ref::<MessageSavingHandler<T>>()
184 .unwrap()
185 .get_messages()
186}