nautilus_live/
runner.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{fmt::Debug, sync::Arc};
17
18use nautilus_common::{
19    messages::{DataEvent, data::DataCommand},
20    msgbus::{self, switchboard::MessagingSwitchboard},
21    runner::{
22        DataCommandSender, TimeEventSender, set_data_cmd_sender, set_data_event_sender,
23        set_time_event_sender,
24    },
25    timer::TimeEventHandlerV2,
26};
27use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
28
29/// Asynchronous implementation of `DataCommandSender` for live environments.
30#[derive(Debug)]
31pub struct AsyncDataCommandSender {
32    cmd_tx: UnboundedSender<DataCommand>,
33}
34
35impl AsyncDataCommandSender {
36    #[must_use]
37    pub const fn new(cmd_tx: UnboundedSender<DataCommand>) -> Self {
38        Self { cmd_tx }
39    }
40}
41
42impl DataCommandSender for AsyncDataCommandSender {
43    fn execute(&self, command: DataCommand) {
44        if let Err(e) = self.cmd_tx.send(command) {
45            log::error!("Failed to send data command: {e}");
46        }
47    }
48}
49
50/// Asynchronous implementation of `TimeEventSender` for live environments.
51#[derive(Debug, Clone)]
52pub struct AsyncTimeEventSender {
53    time_tx: UnboundedSender<TimeEventHandlerV2>,
54}
55
56impl AsyncTimeEventSender {
57    #[must_use]
58    pub const fn new(time_tx: UnboundedSender<TimeEventHandlerV2>) -> Self {
59        Self { time_tx }
60    }
61
62    /// Gets a clone of the underlying channel sender for async use.
63    ///
64    /// This allows async contexts to get a direct channel sender that
65    /// can be moved into async tasks without `RefCell` borrowing issues.
66    #[must_use]
67    pub fn get_channel_sender(&self) -> UnboundedSender<TimeEventHandlerV2> {
68        self.time_tx.clone()
69    }
70}
71
72impl TimeEventSender for AsyncTimeEventSender {
73    fn send(&self, handler: TimeEventHandlerV2) {
74        if let Err(e) = self.time_tx.send(handler) {
75            log::error!("Failed to send time event handler: {e}");
76        }
77    }
78}
79
80pub trait Runner {
81    fn run(&mut self);
82}
83
84pub struct AsyncRunner {
85    data_rx: UnboundedReceiver<DataEvent>,
86    cmd_rx: UnboundedReceiver<DataCommand>,
87    time_rx: UnboundedReceiver<TimeEventHandlerV2>,
88    signal_rx: UnboundedReceiver<()>,
89    signal_tx: UnboundedSender<()>,
90}
91
92impl Default for AsyncRunner {
93    fn default() -> Self {
94        Self::new()
95    }
96}
97
98impl Debug for AsyncRunner {
99    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100        f.debug_struct(stringify!(AsyncRunner)).finish()
101    }
102}
103
104impl AsyncRunner {
105    #[must_use]
106    pub fn new() -> Self {
107        let (data_tx, data_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
108        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
109        let (time_tx, time_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
110        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
111
112        set_time_event_sender(Arc::new(AsyncTimeEventSender::new(time_tx)));
113        set_data_event_sender(data_tx);
114        set_data_cmd_sender(Arc::new(AsyncDataCommandSender::new(cmd_tx)));
115
116        Self {
117            data_rx,
118            cmd_rx,
119            time_rx,
120            signal_rx,
121            signal_tx,
122        }
123    }
124
125    /// Stops the runner with an internal shutdown signal.
126    pub fn stop(&self) {
127        if let Err(e) = self.signal_tx.send(()) {
128            log::error!("Failed to send shutdown signal: {e}");
129        }
130    }
131}
132
133impl AsyncRunner {
134    /// Runs the async runner event loop.
135    ///
136    /// This method processes data events, time events, and signal events in an async loop.
137    /// It will run until a signal is received or the event streams are closed.
138    pub async fn run(&mut self) {
139        log::info!("Starting AsyncRunner");
140
141        let data_engine_process = MessagingSwitchboard::data_engine_process();
142        let data_engine_response = MessagingSwitchboard::data_engine_response();
143        let data_engine_execute = MessagingSwitchboard::data_engine_execute();
144
145        loop {
146            tokio::select! {
147                Some(event) = self.data_rx.recv() => {
148                    match event {
149                        DataEvent::Data(data) => msgbus::send_any(data_engine_process, &data),
150                        DataEvent::Response(resp) => {
151                            msgbus::send_any(data_engine_response, &resp);
152                        }
153                        #[cfg(feature = "defi")]
154                        DataEvent::DeFi(data) => msgbus::send_any(data_engine_process, &data),
155                    }
156                },
157                Some(handler) = self.time_rx.recv() => {
158                    handler.run();
159                },
160                Some(cmd) = self.cmd_rx.recv() => {
161                    msgbus::send_any(data_engine_execute, &cmd);
162                },
163                Some(()) = self.signal_rx.recv() => {
164                    tracing::info!("AsyncRunner received signal, shutting down");
165                    return; // Signal to stop
166                },
167                else => return, // Sentinel event ends run
168            };
169        }
170    }
171}
172
173////////////////////////////////////////////////////////////////////////////////
174// Tests
175////////////////////////////////////////////////////////////////////////////////
176
177#[cfg(test)]
178mod tests {
179    use std::{rc::Rc, time::Duration};
180
181    use nautilus_common::{
182        messages::data::{SubscribeCommand, SubscribeCustomData},
183        timer::{TimeEvent, TimeEventCallback, TimeEventHandlerV2},
184    };
185    use nautilus_core::{UUID4, UnixNanos};
186    use nautilus_model::{
187        data::{Data, quote::QuoteTick},
188        identifiers::{ClientId, InstrumentId},
189        types::{Price, Quantity},
190    };
191    use rstest::rstest;
192    use tokio::sync::mpsc;
193    use ustr::Ustr;
194
195    use super::*;
196
197    // Test fixture for creating test quotes
198    fn test_quote() -> QuoteTick {
199        QuoteTick {
200            instrument_id: InstrumentId::from("EUR/USD.SIM"),
201            bid_price: Price::from("1.10000"),
202            ask_price: Price::from("1.10001"),
203            bid_size: Quantity::from(1_000_000),
204            ask_size: Quantity::from(1_000_000),
205            ts_event: UnixNanos::default(),
206            ts_init: UnixNanos::default(),
207        }
208    }
209
210    #[rstest]
211    fn test_async_data_command_sender_creation() {
212        let (tx, _rx) = mpsc::unbounded_channel();
213        let sender = AsyncDataCommandSender::new(tx);
214        assert!(format!("{sender:?}").contains("AsyncDataCommandSender"));
215    }
216
217    #[rstest]
218    fn test_async_time_event_sender_creation() {
219        let (tx, _rx) = mpsc::unbounded_channel();
220        let sender = AsyncTimeEventSender::new(tx);
221        assert!(format!("{sender:?}").contains("AsyncTimeEventSender"));
222    }
223
224    #[rstest]
225    fn test_async_time_event_sender_get_channel() {
226        let (tx, _rx) = mpsc::unbounded_channel();
227        let sender = AsyncTimeEventSender::new(tx);
228        let channel = sender.get_channel_sender();
229
230        // Verify the channel is functional
231        let event = TimeEvent::new(
232            Ustr::from("test"),
233            UUID4::new(),
234            UnixNanos::from(1),
235            UnixNanos::from(2),
236        );
237        let callback = TimeEventCallback::from(Rc::new(|_: TimeEvent| {}) as Rc<dyn Fn(TimeEvent)>);
238        let handler = TimeEventHandlerV2::new(event, callback);
239
240        assert!(channel.send(handler).is_ok());
241    }
242
243    #[tokio::test]
244    async fn test_async_data_command_sender_execute() {
245        let (tx, mut rx) = mpsc::unbounded_channel();
246        let sender = AsyncDataCommandSender::new(tx);
247
248        let command = DataCommand::Subscribe(SubscribeCommand::Data(SubscribeCustomData {
249            client_id: Some(ClientId::from("TEST")),
250            venue: None,
251            data_type: nautilus_model::data::DataType::new("QuoteTick", None),
252            command_id: UUID4::new(),
253            ts_init: UnixNanos::default(),
254            params: None,
255        }));
256
257        sender.execute(command.clone());
258
259        let received = rx.recv().await.unwrap();
260        match (received, command) {
261            (
262                DataCommand::Subscribe(SubscribeCommand::Data(r)),
263                DataCommand::Subscribe(SubscribeCommand::Data(c)),
264            ) => {
265                assert_eq!(r.client_id, c.client_id);
266                assert_eq!(r.data_type, c.data_type);
267            }
268            _ => panic!("Command mismatch"),
269        }
270    }
271
272    #[tokio::test]
273    async fn test_async_time_event_sender_send() {
274        let (tx, mut rx) = mpsc::unbounded_channel();
275        let sender = AsyncTimeEventSender::new(tx);
276
277        let event = TimeEvent::new(
278            Ustr::from("test"),
279            UUID4::new(),
280            UnixNanos::from(1),
281            UnixNanos::from(2),
282        );
283        let callback = TimeEventCallback::from(Rc::new(|_: TimeEvent| {}) as Rc<dyn Fn(TimeEvent)>);
284        let handler = TimeEventHandlerV2::new(event, callback);
285
286        sender.send(handler);
287
288        assert!(rx.recv().await.is_some());
289    }
290
291    #[tokio::test]
292    async fn test_runner_shutdown_signal() {
293        // Create runner with manual channels to avoid global state
294        let (_data_tx, data_rx) = mpsc::unbounded_channel::<DataEvent>();
295        let (_cmd_tx, cmd_rx) = mpsc::unbounded_channel::<DataCommand>();
296        let (_time_tx, time_rx) = mpsc::unbounded_channel::<TimeEventHandlerV2>();
297        let (signal_tx, signal_rx) = mpsc::unbounded_channel::<()>();
298
299        let mut runner = AsyncRunner {
300            data_rx,
301            cmd_rx,
302            time_rx,
303            signal_rx,
304            signal_tx: signal_tx.clone(),
305        };
306
307        // Start runner
308        let runner_handle = tokio::spawn(async move {
309            runner.run().await;
310        });
311
312        // Send shutdown signal
313        signal_tx.send(()).unwrap();
314
315        // Runner should stop quickly
316        let result = tokio::time::timeout(Duration::from_millis(100), runner_handle).await;
317        assert!(result.is_ok(), "Runner should stop on signal");
318    }
319
320    #[tokio::test]
321    async fn test_runner_closes_on_channel_drop() {
322        let (data_tx, data_rx) = mpsc::unbounded_channel::<DataEvent>();
323        let (_cmd_tx, cmd_rx) = mpsc::unbounded_channel::<DataCommand>();
324        let (_time_tx, time_rx) = mpsc::unbounded_channel::<TimeEventHandlerV2>();
325        let (signal_tx, signal_rx) = mpsc::unbounded_channel::<()>();
326
327        let mut runner = AsyncRunner {
328            data_rx,
329            cmd_rx,
330            time_rx,
331            signal_rx,
332            signal_tx: signal_tx.clone(),
333        };
334
335        // Start runner
336        let runner_handle = tokio::spawn(async move {
337            runner.run().await;
338        });
339
340        // Drop data sender to close channel - this should cause runner to exit
341        drop(data_tx);
342
343        // Send stop signal to ensure clean shutdown
344        tokio::time::sleep(Duration::from_millis(50)).await;
345        signal_tx.send(()).ok();
346
347        // Runner should stop when channels close or on signal
348        let result = tokio::time::timeout(Duration::from_millis(200), runner_handle).await;
349        assert!(
350            result.is_ok(),
351            "Runner should stop when channels close or on signal"
352        );
353    }
354
355    #[tokio::test]
356    async fn test_concurrent_event_sending() {
357        let (data_tx, data_rx) = mpsc::unbounded_channel::<DataEvent>();
358        let (_cmd_tx, cmd_rx) = mpsc::unbounded_channel::<DataCommand>();
359        let (_time_tx, time_rx) = mpsc::unbounded_channel::<TimeEventHandlerV2>();
360        let (signal_tx, signal_rx) = mpsc::unbounded_channel::<()>();
361
362        // Setup runner
363        let mut runner = AsyncRunner {
364            data_rx,
365            cmd_rx,
366            time_rx,
367            signal_rx,
368            signal_tx: signal_tx.clone(),
369        };
370
371        // Spawn multiple concurrent senders
372        let mut handles = vec![];
373        for _ in 0..5 {
374            let tx_clone = data_tx.clone();
375            let handle = tokio::spawn(async move {
376                for _ in 0..20 {
377                    let quote = test_quote();
378                    tx_clone.send(DataEvent::Data(Data::Quote(quote))).unwrap();
379                    tokio::task::yield_now().await;
380                }
381            });
382            handles.push(handle);
383        }
384
385        // Start runner in background
386        let runner_handle = tokio::spawn(async move {
387            runner.run().await;
388        });
389
390        // Wait for all senders
391        for handle in handles {
392            handle.await.unwrap();
393        }
394
395        // Give runner time to process
396        tokio::time::sleep(Duration::from_millis(50)).await;
397
398        // Stop runner
399        signal_tx.send(()).unwrap();
400
401        let _ = tokio::time::timeout(Duration::from_secs(1), runner_handle).await;
402    }
403
404    #[rstest]
405    #[case(10)]
406    #[case(100)]
407    #[case(1000)]
408    fn test_channel_send_performance(#[case] count: usize) {
409        let (tx, mut rx) = mpsc::unbounded_channel::<DataEvent>();
410        let quote = test_quote();
411
412        // Send events
413        for _ in 0..count {
414            tx.send(DataEvent::Data(Data::Quote(quote))).unwrap();
415        }
416
417        // Verify all received
418        let mut received = 0;
419        while rx.try_recv().is_ok() {
420            received += 1;
421        }
422
423        assert_eq!(received, count);
424    }
425}