1use std::{fmt::Debug, sync::Arc};
17
18use nautilus_common::{
19 messages::{DataEvent, data::DataCommand},
20 msgbus::{self, switchboard::MessagingSwitchboard},
21 runner::{
22 DataCommandSender, TimeEventSender, set_data_cmd_sender, set_data_event_sender,
23 set_time_event_sender,
24 },
25 timer::TimeEventHandlerV2,
26};
27use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
28
29#[derive(Debug)]
31pub struct AsyncDataCommandSender {
32 cmd_tx: UnboundedSender<DataCommand>,
33}
34
35impl AsyncDataCommandSender {
36 #[must_use]
37 pub const fn new(cmd_tx: UnboundedSender<DataCommand>) -> Self {
38 Self { cmd_tx }
39 }
40}
41
42impl DataCommandSender for AsyncDataCommandSender {
43 fn execute(&self, command: DataCommand) {
44 if let Err(e) = self.cmd_tx.send(command) {
45 log::error!("Failed to send data command: {e}");
46 }
47 }
48}
49
50#[derive(Debug, Clone)]
52pub struct AsyncTimeEventSender {
53 time_tx: UnboundedSender<TimeEventHandlerV2>,
54}
55
56impl AsyncTimeEventSender {
57 #[must_use]
58 pub const fn new(time_tx: UnboundedSender<TimeEventHandlerV2>) -> Self {
59 Self { time_tx }
60 }
61
62 #[must_use]
67 pub fn get_channel_sender(&self) -> UnboundedSender<TimeEventHandlerV2> {
68 self.time_tx.clone()
69 }
70}
71
72impl TimeEventSender for AsyncTimeEventSender {
73 fn send(&self, handler: TimeEventHandlerV2) {
74 if let Err(e) = self.time_tx.send(handler) {
75 log::error!("Failed to send time event handler: {e}");
76 }
77 }
78}
79
80pub trait Runner {
81 fn run(&mut self);
82}
83
84pub struct AsyncRunner {
85 data_rx: UnboundedReceiver<DataEvent>,
86 cmd_rx: UnboundedReceiver<DataCommand>,
87 time_rx: UnboundedReceiver<TimeEventHandlerV2>,
88 signal_rx: UnboundedReceiver<()>,
89 signal_tx: UnboundedSender<()>,
90}
91
92impl Default for AsyncRunner {
93 fn default() -> Self {
94 Self::new()
95 }
96}
97
98impl Debug for AsyncRunner {
99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 f.debug_struct(stringify!(AsyncRunner)).finish()
101 }
102}
103
104impl AsyncRunner {
105 #[must_use]
106 pub fn new() -> Self {
107 let (data_tx, data_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
108 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
109 let (time_tx, time_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
110 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
111
112 set_time_event_sender(Arc::new(AsyncTimeEventSender::new(time_tx)));
113 set_data_event_sender(data_tx);
114 set_data_cmd_sender(Arc::new(AsyncDataCommandSender::new(cmd_tx)));
115
116 Self {
117 data_rx,
118 cmd_rx,
119 time_rx,
120 signal_rx,
121 signal_tx,
122 }
123 }
124
125 pub fn stop(&self) {
127 if let Err(e) = self.signal_tx.send(()) {
128 log::error!("Failed to send shutdown signal: {e}");
129 }
130 }
131}
132
133impl AsyncRunner {
134 pub async fn run(&mut self) {
139 log::info!("Starting AsyncRunner");
140
141 let data_engine_process = MessagingSwitchboard::data_engine_process();
142 let data_engine_response = MessagingSwitchboard::data_engine_response();
143 let data_engine_execute = MessagingSwitchboard::data_engine_execute();
144
145 loop {
146 tokio::select! {
147 Some(event) = self.data_rx.recv() => {
148 match event {
149 DataEvent::Data(data) => msgbus::send_any(data_engine_process, &data),
150 DataEvent::Response(resp) => {
151 msgbus::send_any(data_engine_response, &resp);
152 }
153 #[cfg(feature = "defi")]
154 DataEvent::DeFi(data) => msgbus::send_any(data_engine_process, &data),
155 }
156 },
157 Some(handler) = self.time_rx.recv() => {
158 handler.run();
159 },
160 Some(cmd) = self.cmd_rx.recv() => {
161 msgbus::send_any(data_engine_execute, &cmd);
162 },
163 Some(()) = self.signal_rx.recv() => {
164 tracing::info!("AsyncRunner received signal, shutting down");
165 return; },
167 else => return, };
169 }
170 }
171}
172
173#[cfg(test)]
178mod tests {
179 use std::{rc::Rc, time::Duration};
180
181 use nautilus_common::{
182 messages::data::{SubscribeCommand, SubscribeCustomData},
183 timer::{TimeEvent, TimeEventCallback, TimeEventHandlerV2},
184 };
185 use nautilus_core::{UUID4, UnixNanos};
186 use nautilus_model::{
187 data::{Data, quote::QuoteTick},
188 identifiers::{ClientId, InstrumentId},
189 types::{Price, Quantity},
190 };
191 use rstest::rstest;
192 use tokio::sync::mpsc;
193 use ustr::Ustr;
194
195 use super::*;
196
197 fn test_quote() -> QuoteTick {
199 QuoteTick {
200 instrument_id: InstrumentId::from("EUR/USD.SIM"),
201 bid_price: Price::from("1.10000"),
202 ask_price: Price::from("1.10001"),
203 bid_size: Quantity::from(1_000_000),
204 ask_size: Quantity::from(1_000_000),
205 ts_event: UnixNanos::default(),
206 ts_init: UnixNanos::default(),
207 }
208 }
209
210 #[rstest]
211 fn test_async_data_command_sender_creation() {
212 let (tx, _rx) = mpsc::unbounded_channel();
213 let sender = AsyncDataCommandSender::new(tx);
214 assert!(format!("{sender:?}").contains("AsyncDataCommandSender"));
215 }
216
217 #[rstest]
218 fn test_async_time_event_sender_creation() {
219 let (tx, _rx) = mpsc::unbounded_channel();
220 let sender = AsyncTimeEventSender::new(tx);
221 assert!(format!("{sender:?}").contains("AsyncTimeEventSender"));
222 }
223
224 #[rstest]
225 fn test_async_time_event_sender_get_channel() {
226 let (tx, _rx) = mpsc::unbounded_channel();
227 let sender = AsyncTimeEventSender::new(tx);
228 let channel = sender.get_channel_sender();
229
230 let event = TimeEvent::new(
232 Ustr::from("test"),
233 UUID4::new(),
234 UnixNanos::from(1),
235 UnixNanos::from(2),
236 );
237 let callback = TimeEventCallback::from(Rc::new(|_: TimeEvent| {}) as Rc<dyn Fn(TimeEvent)>);
238 let handler = TimeEventHandlerV2::new(event, callback);
239
240 assert!(channel.send(handler).is_ok());
241 }
242
243 #[tokio::test]
244 async fn test_async_data_command_sender_execute() {
245 let (tx, mut rx) = mpsc::unbounded_channel();
246 let sender = AsyncDataCommandSender::new(tx);
247
248 let command = DataCommand::Subscribe(SubscribeCommand::Data(SubscribeCustomData {
249 client_id: Some(ClientId::from("TEST")),
250 venue: None,
251 data_type: nautilus_model::data::DataType::new("QuoteTick", None),
252 command_id: UUID4::new(),
253 ts_init: UnixNanos::default(),
254 params: None,
255 }));
256
257 sender.execute(command.clone());
258
259 let received = rx.recv().await.unwrap();
260 match (received, command) {
261 (
262 DataCommand::Subscribe(SubscribeCommand::Data(r)),
263 DataCommand::Subscribe(SubscribeCommand::Data(c)),
264 ) => {
265 assert_eq!(r.client_id, c.client_id);
266 assert_eq!(r.data_type, c.data_type);
267 }
268 _ => panic!("Command mismatch"),
269 }
270 }
271
272 #[tokio::test]
273 async fn test_async_time_event_sender_send() {
274 let (tx, mut rx) = mpsc::unbounded_channel();
275 let sender = AsyncTimeEventSender::new(tx);
276
277 let event = TimeEvent::new(
278 Ustr::from("test"),
279 UUID4::new(),
280 UnixNanos::from(1),
281 UnixNanos::from(2),
282 );
283 let callback = TimeEventCallback::from(Rc::new(|_: TimeEvent| {}) as Rc<dyn Fn(TimeEvent)>);
284 let handler = TimeEventHandlerV2::new(event, callback);
285
286 sender.send(handler);
287
288 assert!(rx.recv().await.is_some());
289 }
290
291 #[tokio::test]
292 async fn test_runner_shutdown_signal() {
293 let (_data_tx, data_rx) = mpsc::unbounded_channel::<DataEvent>();
295 let (_cmd_tx, cmd_rx) = mpsc::unbounded_channel::<DataCommand>();
296 let (_time_tx, time_rx) = mpsc::unbounded_channel::<TimeEventHandlerV2>();
297 let (signal_tx, signal_rx) = mpsc::unbounded_channel::<()>();
298
299 let mut runner = AsyncRunner {
300 data_rx,
301 cmd_rx,
302 time_rx,
303 signal_rx,
304 signal_tx: signal_tx.clone(),
305 };
306
307 let runner_handle = tokio::spawn(async move {
309 runner.run().await;
310 });
311
312 signal_tx.send(()).unwrap();
314
315 let result = tokio::time::timeout(Duration::from_millis(100), runner_handle).await;
317 assert!(result.is_ok(), "Runner should stop on signal");
318 }
319
320 #[tokio::test]
321 async fn test_runner_closes_on_channel_drop() {
322 let (data_tx, data_rx) = mpsc::unbounded_channel::<DataEvent>();
323 let (_cmd_tx, cmd_rx) = mpsc::unbounded_channel::<DataCommand>();
324 let (_time_tx, time_rx) = mpsc::unbounded_channel::<TimeEventHandlerV2>();
325 let (signal_tx, signal_rx) = mpsc::unbounded_channel::<()>();
326
327 let mut runner = AsyncRunner {
328 data_rx,
329 cmd_rx,
330 time_rx,
331 signal_rx,
332 signal_tx: signal_tx.clone(),
333 };
334
335 let runner_handle = tokio::spawn(async move {
337 runner.run().await;
338 });
339
340 drop(data_tx);
342
343 tokio::time::sleep(Duration::from_millis(50)).await;
345 signal_tx.send(()).ok();
346
347 let result = tokio::time::timeout(Duration::from_millis(200), runner_handle).await;
349 assert!(
350 result.is_ok(),
351 "Runner should stop when channels close or on signal"
352 );
353 }
354
355 #[tokio::test]
356 async fn test_concurrent_event_sending() {
357 let (data_tx, data_rx) = mpsc::unbounded_channel::<DataEvent>();
358 let (_cmd_tx, cmd_rx) = mpsc::unbounded_channel::<DataCommand>();
359 let (_time_tx, time_rx) = mpsc::unbounded_channel::<TimeEventHandlerV2>();
360 let (signal_tx, signal_rx) = mpsc::unbounded_channel::<()>();
361
362 let mut runner = AsyncRunner {
364 data_rx,
365 cmd_rx,
366 time_rx,
367 signal_rx,
368 signal_tx: signal_tx.clone(),
369 };
370
371 let mut handles = vec![];
373 for _ in 0..5 {
374 let tx_clone = data_tx.clone();
375 let handle = tokio::spawn(async move {
376 for _ in 0..20 {
377 let quote = test_quote();
378 tx_clone.send(DataEvent::Data(Data::Quote(quote))).unwrap();
379 tokio::task::yield_now().await;
380 }
381 });
382 handles.push(handle);
383 }
384
385 let runner_handle = tokio::spawn(async move {
387 runner.run().await;
388 });
389
390 for handle in handles {
392 handle.await.unwrap();
393 }
394
395 tokio::time::sleep(Duration::from_millis(50)).await;
397
398 signal_tx.send(()).unwrap();
400
401 let _ = tokio::time::timeout(Duration::from_secs(1), runner_handle).await;
402 }
403
404 #[rstest]
405 #[case(10)]
406 #[case(100)]
407 #[case(1000)]
408 fn test_channel_send_performance(#[case] count: usize) {
409 let (tx, mut rx) = mpsc::unbounded_channel::<DataEvent>();
410 let quote = test_quote();
411
412 for _ in 0..count {
414 tx.send(DataEvent::Data(Data::Quote(quote))).unwrap();
415 }
416
417 let mut received = 0;
419 while rx.try_recv().is_ok() {
420 received += 1;
421 }
422
423 assert_eq!(received, count);
424 }
425}