nautilus_common/msgbus/
stubs.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    any::Any,
18    cell::RefCell,
19    rc::Rc,
20    sync::{
21        atomic::{AtomicBool, Ordering},
22        Arc,
23    },
24};
25
26use nautilus_core::message::Message;
27use nautilus_model::data::Data;
28use ustr::Ustr;
29use uuid::Uuid;
30
31use crate::{
32    messages::data::DataResponse,
33    msgbus::{handler::MessageHandler, ShareableMessageHandler},
34};
35
36// Stub message handler which logs the data it receives
37pub struct StubMessageHandler {
38    id: Ustr,
39    callback: Arc<dyn Fn(Message) + Send>,
40}
41
42impl MessageHandler for StubMessageHandler {
43    fn id(&self) -> Ustr {
44        self.id
45    }
46
47    fn handle(&self, message: &dyn Any) {
48        (self.callback)(message.downcast_ref::<Message>().unwrap().clone());
49    }
50
51    fn handle_response(&self, _resp: DataResponse) {}
52
53    fn handle_data(&self, _data: Data) {}
54
55    fn as_any(&self) -> &dyn Any {
56        self
57    }
58}
59
60#[must_use]
61#[allow(unused_must_use)] // TODO: Temporary to fix docs build
62pub fn get_stub_shareable_handler(id: Option<Ustr>) -> ShareableMessageHandler {
63    // TODO: This reduces the need to come up with ID strings in tests.
64    // In Python we do something like `hash((self.topic, str(self.handler)))` for the hash
65    // which includes the memory address, just went with a UUID4 here.
66    let unique_id = id.unwrap_or_else(|| Ustr::from(&Uuid::new_v4().to_string()));
67    ShareableMessageHandler(Rc::new(StubMessageHandler {
68        id: unique_id,
69        callback: Arc::new(|m: Message| {
70            format!("{m:?}");
71        }),
72    }))
73}
74
75// Stub message handler which checks if handle was called
76pub struct CallCheckMessageHandler {
77    id: Ustr,
78    called: Arc<AtomicBool>,
79}
80
81impl CallCheckMessageHandler {
82    #[must_use]
83    pub fn was_called(&self) -> bool {
84        self.called.load(Ordering::SeqCst)
85    }
86}
87
88impl MessageHandler for CallCheckMessageHandler {
89    fn id(&self) -> Ustr {
90        self.id
91    }
92
93    fn handle(&self, _message: &dyn Any) {
94        self.called.store(true, Ordering::SeqCst);
95    }
96
97    fn handle_response(&self, _resp: DataResponse) {}
98
99    fn handle_data(&self, _data: Data) {}
100
101    fn as_any(&self) -> &dyn Any {
102        self
103    }
104}
105
106#[must_use]
107pub fn get_call_check_shareable_handler(id: Option<Ustr>) -> ShareableMessageHandler {
108    // TODO: This reduces the need to come up with ID strings in tests.
109    // In Python we do something like `hash((self.topic, str(self.handler)))` for the hash
110    // which includes the memory address, just went with a UUID4 here.
111    let unique_id = id.unwrap_or_else(|| Ustr::from(&Uuid::new_v4().to_string()));
112    ShareableMessageHandler(Rc::new(CallCheckMessageHandler {
113        id: unique_id,
114        called: Arc::new(AtomicBool::new(false)),
115    }))
116}
117
118#[must_use]
119pub fn check_handler_was_called(call_check_handler: ShareableMessageHandler) -> bool {
120    call_check_handler
121        .0
122        .as_ref()
123        .as_any()
124        .downcast_ref::<CallCheckMessageHandler>()
125        .unwrap()
126        .was_called()
127}
128
129// Handler which saves the messages it receives
130#[derive(Debug, Clone)]
131pub struct MessageSavingHandler<T> {
132    id: Ustr,
133    messages: Rc<RefCell<Vec<T>>>,
134}
135
136impl<T: Clone + 'static> MessageSavingHandler<T> {
137    #[must_use]
138    pub fn get_messages(&self) -> Vec<T> {
139        self.messages.borrow().clone()
140    }
141}
142
143impl<T: Clone + 'static> MessageHandler for MessageSavingHandler<T> {
144    fn id(&self) -> Ustr {
145        self.id
146    }
147
148    fn handle(&self, message: &dyn Any) {
149        let mut messages = self.messages.borrow_mut();
150        match message.downcast_ref::<T>() {
151            Some(m) => messages.push(m.clone()),
152            None => panic!("MessageSavingHandler: message type mismatch {message:?}"),
153        }
154    }
155
156    fn handle_response(&self, _resp: DataResponse) {}
157
158    fn handle_data(&self, _data: Data) {}
159
160    fn as_any(&self) -> &dyn Any {
161        self
162    }
163}
164
165#[must_use]
166pub fn get_message_saving_handler<T: Clone + 'static>(id: Option<Ustr>) -> ShareableMessageHandler {
167    // TODO: This reduces the need to come up with ID strings in tests.
168    // In Python we do something like `hash((self.topic, str(self.handler)))` for the hash
169    // which includes the memory address, just went with a UUID4 here.
170    let unique_id = id.unwrap_or_else(|| Ustr::from(&Uuid::new_v4().to_string()));
171    ShareableMessageHandler(Rc::new(MessageSavingHandler::<T> {
172        id: unique_id,
173        messages: Rc::new(RefCell::new(Vec::new())),
174    }))
175}
176
177#[must_use]
178pub fn get_saved_messages<T: Clone + 'static>(handler: ShareableMessageHandler) -> Vec<T> {
179    handler
180        .0
181        .as_ref()
182        .as_any()
183        .downcast_ref::<MessageSavingHandler<T>>()
184        .unwrap()
185        .get_messages()
186}