nautilus_live/
runner.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{fmt::Debug, sync::Arc};
17
18use nautilus_common::{
19    messages::{
20        DataEvent, ExecutionEvent, ExecutionReport, data::DataCommand, execution::TradingCommand,
21    },
22    msgbus::{self, switchboard::MessagingSwitchboard},
23    runner::{
24        DataCommandSender, TimeEventSender, TradingCommandSender, set_data_cmd_sender,
25        set_data_event_sender, set_exec_cmd_sender, set_exec_event_sender, set_time_event_sender,
26    },
27    timer::TimeEventHandlerV2,
28};
29use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
30
31/// Asynchronous implementation of `DataCommandSender` for live environments.
32#[derive(Debug)]
33pub struct AsyncDataCommandSender {
34    cmd_tx: UnboundedSender<DataCommand>,
35}
36
37impl AsyncDataCommandSender {
38    #[must_use]
39    pub const fn new(cmd_tx: UnboundedSender<DataCommand>) -> Self {
40        Self { cmd_tx }
41    }
42}
43
44impl DataCommandSender for AsyncDataCommandSender {
45    fn execute(&self, command: DataCommand) {
46        if let Err(e) = self.cmd_tx.send(command) {
47            log::error!("Failed to send data command: {e}");
48        }
49    }
50}
51
52/// Asynchronous implementation of `TimeEventSender` for live environments.
53#[derive(Debug, Clone)]
54pub struct AsyncTimeEventSender {
55    time_tx: UnboundedSender<TimeEventHandlerV2>,
56}
57
58impl AsyncTimeEventSender {
59    #[must_use]
60    pub const fn new(time_tx: UnboundedSender<TimeEventHandlerV2>) -> Self {
61        Self { time_tx }
62    }
63
64    /// Gets a clone of the underlying channel sender for async use.
65    ///
66    /// This allows async contexts to get a direct channel sender that
67    /// can be moved into async tasks without `RefCell` borrowing issues.
68    #[must_use]
69    pub fn get_channel_sender(&self) -> UnboundedSender<TimeEventHandlerV2> {
70        self.time_tx.clone()
71    }
72}
73
74impl TimeEventSender for AsyncTimeEventSender {
75    fn send(&self, handler: TimeEventHandlerV2) {
76        if let Err(e) = self.time_tx.send(handler) {
77            log::error!("Failed to send time event handler: {e}");
78        }
79    }
80}
81
82/// Asynchronous implementation of `TradingCommandSender` for live environments.
83#[derive(Debug)]
84pub struct AsyncTradingCommandSender {
85    cmd_tx: UnboundedSender<TradingCommand>,
86}
87
88impl AsyncTradingCommandSender {
89    #[must_use]
90    pub const fn new(cmd_tx: UnboundedSender<TradingCommand>) -> Self {
91        Self { cmd_tx }
92    }
93}
94
95impl TradingCommandSender for AsyncTradingCommandSender {
96    fn execute(&self, command: TradingCommand) {
97        if let Err(e) = self.cmd_tx.send(command) {
98            log::error!("Failed to send trading command: {e}");
99        }
100    }
101}
102
103pub trait Runner {
104    fn run(&mut self);
105}
106
107pub struct AsyncRunner {
108    time_evt_rx: UnboundedReceiver<TimeEventHandlerV2>,
109    data_evt_rx: UnboundedReceiver<DataEvent>,
110    data_cmd_rx: UnboundedReceiver<DataCommand>,
111    exec_evt_rx: UnboundedReceiver<ExecutionEvent>,
112    exec_cmd_rx: UnboundedReceiver<TradingCommand>,
113    signal_rx: UnboundedReceiver<()>,
114    signal_tx: UnboundedSender<()>,
115}
116
117impl Default for AsyncRunner {
118    fn default() -> Self {
119        Self::new()
120    }
121}
122
123impl Debug for AsyncRunner {
124    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125        f.debug_struct(stringify!(AsyncRunner)).finish()
126    }
127}
128
129impl AsyncRunner {
130    #[must_use]
131    pub fn new() -> Self {
132        let (time_evt_tx, time_evt_rx) =
133            tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
134        let (data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
135        let (data_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
136        let (exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
137        let (exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
138        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
139
140        set_time_event_sender(Arc::new(AsyncTimeEventSender::new(time_evt_tx)));
141        set_data_event_sender(data_evt_tx);
142        set_data_cmd_sender(Arc::new(AsyncDataCommandSender::new(data_cmd_tx)));
143        set_exec_event_sender(exec_evt_tx);
144        set_exec_cmd_sender(Arc::new(AsyncTradingCommandSender::new(exec_cmd_tx)));
145
146        Self {
147            time_evt_rx,
148            data_evt_rx,
149            data_cmd_rx,
150            exec_evt_rx,
151            exec_cmd_rx,
152            signal_rx,
153            signal_tx,
154        }
155    }
156
157    /// Stops the runner with an internal shutdown signal.
158    pub fn stop(&self) {
159        if let Err(e) = self.signal_tx.send(()) {
160            log::error!("Failed to send shutdown signal: {e}");
161        }
162    }
163}
164
165impl AsyncRunner {
166    /// Runs the async runner event loop.
167    ///
168    /// This method processes data events, time events, execution events, and signal events in an async loop.
169    /// It will run until a signal is received or the event streams are closed.
170    pub async fn run(&mut self) {
171        log::info!("Starting AsyncRunner");
172
173        let data_engine_process = MessagingSwitchboard::data_engine_process();
174        let data_engine_response = MessagingSwitchboard::data_engine_response();
175        let data_engine_execute = MessagingSwitchboard::data_engine_execute();
176        let exec_engine_process = MessagingSwitchboard::exec_engine_process();
177        let exec_engine_execute = MessagingSwitchboard::exec_engine_execute();
178
179        loop {
180            tokio::select! {
181                Some(event) = self.data_evt_rx.recv() => {
182                    match event {
183                        DataEvent::Data(data) => msgbus::send_any(data_engine_process, &data),
184                        DataEvent::Response(resp) => {
185                            msgbus::send_any(data_engine_response, &resp);
186                        }
187                        #[cfg(feature = "defi")]
188                        DataEvent::DeFi(data) => msgbus::send_any(data_engine_process, &data),
189                    }
190                },
191                Some(handler) = self.time_evt_rx.recv() => {
192                    handler.run();
193                },
194                Some(cmd) = self.data_cmd_rx.recv() => {
195                    msgbus::send_any(data_engine_execute, &cmd);
196                },
197                Some(event) = self.exec_evt_rx.recv() => {
198                    match event {
199                        ExecutionEvent::Order(order_event) => {
200                            msgbus::send_any(exec_engine_process, &order_event);
201                        }
202                        ExecutionEvent::Report(report) => {
203                            match report {
204                                ExecutionReport::OrderStatus(r) => {
205                                    msgbus::send_any("ExecEngine.reconcile_execution_report".into(), &*r);
206                                }
207                                ExecutionReport::Fill(r) => {
208                                    msgbus::send_any("ExecEngine.reconcile_execution_report".into(), &*r);
209                                }
210                                ExecutionReport::Position(r) => {
211                                    msgbus::send_any("ExecEngine.reconcile_execution_report".into(), &*r);
212                                }
213                                ExecutionReport::Mass(r) => {
214                                    msgbus::send_any("ExecEngine.reconcile_execution_mass_status".into(), &*r);
215                                }
216                            }
217                        }
218                    }
219                },
220                Some(cmd) = self.exec_cmd_rx.recv() => {
221                    msgbus::send_any(exec_engine_execute, &cmd);
222                },
223                Some(()) = self.signal_rx.recv() => {
224                    tracing::info!("AsyncRunner received signal, shutting down");
225                    return; // Signal to stop
226                },
227                else => return, // Sentinel event ends run
228            };
229        }
230    }
231}
232
233////////////////////////////////////////////////////////////////////////////////
234// Tests
235////////////////////////////////////////////////////////////////////////////////
236
237#[cfg(test)]
238mod tests {
239    use std::time::Duration;
240
241    use nautilus_common::{
242        messages::{
243            ExecutionEvent, ExecutionReport,
244            data::{SubscribeCommand, SubscribeCustomData},
245            execution::TradingCommand,
246        },
247        timer::{TimeEvent, TimeEventCallback, TimeEventHandlerV2},
248    };
249    use nautilus_core::{UUID4, UnixNanos};
250    use nautilus_model::{
251        data::{Data, quote::QuoteTick},
252        enums::OrderSide,
253        events::{OrderEvent, OrderSubmitted},
254        identifiers::{
255            AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId,
256        },
257        types::{Price, Quantity},
258    };
259    use rstest::rstest;
260    use tokio::sync::mpsc;
261    use ustr::Ustr;
262
263    use super::*;
264
265    // Test fixture for creating test quotes
266    fn test_quote() -> QuoteTick {
267        QuoteTick {
268            instrument_id: InstrumentId::from("EUR/USD.SIM"),
269            bid_price: Price::from("1.10000"),
270            ask_price: Price::from("1.10001"),
271            bid_size: Quantity::from(1_000_000),
272            ask_size: Quantity::from(1_000_000),
273            ts_event: UnixNanos::default(),
274            ts_init: UnixNanos::default(),
275        }
276    }
277
278    #[rstest]
279    fn test_async_data_command_sender_creation() {
280        let (tx, _rx) = mpsc::unbounded_channel();
281        let sender = AsyncDataCommandSender::new(tx);
282        assert!(format!("{sender:?}").contains("AsyncDataCommandSender"));
283    }
284
285    #[rstest]
286    fn test_async_time_event_sender_creation() {
287        let (tx, _rx) = mpsc::unbounded_channel();
288        let sender = AsyncTimeEventSender::new(tx);
289        assert!(format!("{sender:?}").contains("AsyncTimeEventSender"));
290    }
291
292    #[rstest]
293    fn test_async_time_event_sender_get_channel() {
294        let (tx, _rx) = mpsc::unbounded_channel();
295        let sender = AsyncTimeEventSender::new(tx);
296        let channel = sender.get_channel_sender();
297
298        // Verify the channel is functional
299        let event = TimeEvent::new(
300            Ustr::from("test"),
301            UUID4::new(),
302            UnixNanos::from(1),
303            UnixNanos::from(2),
304        );
305        let callback = TimeEventCallback::from(|_: TimeEvent| {});
306        let handler = TimeEventHandlerV2::new(event, callback);
307
308        assert!(channel.send(handler).is_ok());
309    }
310
311    #[tokio::test]
312    async fn test_async_data_command_sender_execute() {
313        let (tx, mut rx) = mpsc::unbounded_channel();
314        let sender = AsyncDataCommandSender::new(tx);
315
316        let command = DataCommand::Subscribe(SubscribeCommand::Data(SubscribeCustomData {
317            client_id: Some(ClientId::from("TEST")),
318            venue: None,
319            data_type: nautilus_model::data::DataType::new("QuoteTick", None),
320            command_id: UUID4::new(),
321            ts_init: UnixNanos::default(),
322            params: None,
323        }));
324
325        sender.execute(command.clone());
326
327        let received = rx.recv().await.unwrap();
328        match (received, command) {
329            (
330                DataCommand::Subscribe(SubscribeCommand::Data(r)),
331                DataCommand::Subscribe(SubscribeCommand::Data(c)),
332            ) => {
333                assert_eq!(r.client_id, c.client_id);
334                assert_eq!(r.data_type, c.data_type);
335            }
336            _ => panic!("Command mismatch"),
337        }
338    }
339
340    #[tokio::test]
341    async fn test_async_time_event_sender_send() {
342        let (tx, mut rx) = mpsc::unbounded_channel();
343        let sender = AsyncTimeEventSender::new(tx);
344
345        let event = TimeEvent::new(
346            Ustr::from("test"),
347            UUID4::new(),
348            UnixNanos::from(1),
349            UnixNanos::from(2),
350        );
351        let callback = TimeEventCallback::from(|_: TimeEvent| {});
352        let handler = TimeEventHandlerV2::new(event, callback);
353
354        sender.send(handler);
355
356        assert!(rx.recv().await.is_some());
357    }
358
359    #[tokio::test]
360    async fn test_runner_shutdown_signal() {
361        // Create runner with manual channels to avoid global state
362        let (_data_tx, data_evt_rx) = mpsc::unbounded_channel::<DataEvent>();
363        let (_cmd_tx, data_cmd_rx) = mpsc::unbounded_channel::<DataCommand>();
364        let (_time_tx, time_evt_rx) = mpsc::unbounded_channel::<TimeEventHandlerV2>();
365        let (_exec_evt_tx, exec_evt_rx) = mpsc::unbounded_channel::<ExecutionEvent>();
366        let (_exec_cmd_tx, exec_cmd_rx) = mpsc::unbounded_channel::<TradingCommand>();
367        let (signal_tx, signal_rx) = mpsc::unbounded_channel::<()>();
368
369        let mut runner = AsyncRunner {
370            data_evt_rx,
371            data_cmd_rx,
372            time_evt_rx,
373            exec_evt_rx,
374            exec_cmd_rx,
375            signal_rx,
376            signal_tx: signal_tx.clone(),
377        };
378
379        // Start runner
380        let runner_handle = tokio::spawn(async move {
381            runner.run().await;
382        });
383
384        // Send shutdown signal
385        signal_tx.send(()).unwrap();
386
387        // Runner should stop quickly
388        let result = tokio::time::timeout(Duration::from_millis(100), runner_handle).await;
389        assert!(result.is_ok(), "Runner should stop on signal");
390    }
391
392    #[tokio::test]
393    async fn test_runner_closes_on_channel_drop() {
394        let (data_tx, data_evt_rx) = mpsc::unbounded_channel::<DataEvent>();
395        let (_cmd_tx, data_cmd_rx) = mpsc::unbounded_channel::<DataCommand>();
396        let (_time_tx, time_evt_rx) = mpsc::unbounded_channel::<TimeEventHandlerV2>();
397        let (_exec_evt_tx, exec_evt_rx) = mpsc::unbounded_channel::<ExecutionEvent>();
398        let (_exec_cmd_tx, exec_cmd_rx) = mpsc::unbounded_channel::<TradingCommand>();
399        let (signal_tx, signal_rx) = mpsc::unbounded_channel::<()>();
400
401        let mut runner = AsyncRunner {
402            data_evt_rx,
403            data_cmd_rx,
404            time_evt_rx,
405            exec_evt_rx,
406            exec_cmd_rx,
407            signal_rx,
408            signal_tx: signal_tx.clone(),
409        };
410
411        // Start runner
412        let runner_handle = tokio::spawn(async move {
413            runner.run().await;
414        });
415
416        // Drop data sender to close channel - this should cause runner to exit
417        drop(data_tx);
418
419        // Send stop signal to ensure clean shutdown
420        tokio::time::sleep(Duration::from_millis(50)).await;
421        signal_tx.send(()).ok();
422
423        // Runner should stop when channels close or on signal
424        let result = tokio::time::timeout(Duration::from_millis(200), runner_handle).await;
425        assert!(
426            result.is_ok(),
427            "Runner should stop when channels close or on signal"
428        );
429    }
430
431    #[tokio::test]
432    async fn test_concurrent_event_sending() {
433        let (data_evt_tx, data_evt_rx) = mpsc::unbounded_channel::<DataEvent>();
434        let (_data_cmd_tx, data_cmd_rx) = mpsc::unbounded_channel::<DataCommand>();
435        let (_time_evt_tx, time_evt_rx) = mpsc::unbounded_channel::<TimeEventHandlerV2>();
436        let (_exec_evt_tx, exec_evt_rx) = mpsc::unbounded_channel::<ExecutionEvent>();
437        let (_exec_cmd_tx, exec_cmd_rx) = mpsc::unbounded_channel::<TradingCommand>();
438        let (signal_tx, signal_rx) = mpsc::unbounded_channel::<()>();
439
440        // Setup runner
441        let mut runner = AsyncRunner {
442            time_evt_rx,
443            data_evt_rx,
444            data_cmd_rx,
445            exec_evt_rx,
446            exec_cmd_rx,
447            signal_rx,
448            signal_tx: signal_tx.clone(),
449        };
450
451        // Spawn multiple concurrent senders
452        let mut handles = vec![];
453        for _ in 0..5 {
454            let tx_clone = data_evt_tx.clone();
455            let handle = tokio::spawn(async move {
456                for _ in 0..20 {
457                    let quote = test_quote();
458                    tx_clone.send(DataEvent::Data(Data::Quote(quote))).unwrap();
459                    tokio::task::yield_now().await;
460                }
461            });
462            handles.push(handle);
463        }
464
465        // Start runner in background
466        let runner_handle = tokio::spawn(async move {
467            runner.run().await;
468        });
469
470        // Wait for all senders
471        for handle in handles {
472            handle.await.unwrap();
473        }
474
475        // Give runner time to process
476        tokio::time::sleep(Duration::from_millis(50)).await;
477
478        // Stop runner
479        signal_tx.send(()).unwrap();
480
481        let _ = tokio::time::timeout(Duration::from_secs(1), runner_handle).await;
482    }
483
484    #[rstest]
485    #[case(10)]
486    #[case(100)]
487    #[case(1000)]
488    fn test_channel_send_performance(#[case] count: usize) {
489        let (tx, mut rx) = mpsc::unbounded_channel::<DataEvent>();
490        let quote = test_quote();
491
492        // Send events
493        for _ in 0..count {
494            tx.send(DataEvent::Data(Data::Quote(quote))).unwrap();
495        }
496
497        // Verify all received
498        let mut received = 0;
499        while rx.try_recv().is_ok() {
500            received += 1;
501        }
502
503        assert_eq!(received, count);
504    }
505
506    #[rstest]
507    fn test_async_trading_command_sender_creation() {
508        let (tx, _rx) = mpsc::unbounded_channel();
509        let sender = AsyncTradingCommandSender::new(tx);
510        assert!(format!("{sender:?}").contains("AsyncTradingCommandSender"));
511    }
512
513    #[tokio::test]
514    async fn test_execution_event_order_channel() {
515        let (tx, mut rx) = mpsc::unbounded_channel::<ExecutionEvent>();
516
517        let event = OrderSubmitted::new(
518            TraderId::from("TRADER-001"),
519            StrategyId::from("S-001"),
520            InstrumentId::from("EUR/USD.SIM"),
521            ClientOrderId::from("O-001"),
522            AccountId::from("SIM-001"),
523            UUID4::new(),
524            UnixNanos::from(1),
525            UnixNanos::from(2),
526        );
527
528        tx.send(ExecutionEvent::Order(
529            nautilus_model::events::OrderEventAny::Submitted(event),
530        ))
531        .unwrap();
532
533        let received = rx.recv().await.unwrap();
534        match received {
535            ExecutionEvent::Order(nautilus_model::events::OrderEventAny::Submitted(e)) => {
536                assert_eq!(e.client_order_id(), ClientOrderId::from("O-001"));
537            }
538            _ => panic!("Expected OrderSubmitted event"),
539        }
540    }
541
542    #[tokio::test]
543    async fn test_execution_report_order_status_channel() {
544        use nautilus_model::{
545            enums::{OrderStatus, OrderType, TimeInForce},
546            reports::OrderStatusReport,
547        };
548
549        let (tx, mut rx) = mpsc::unbounded_channel::<ExecutionEvent>();
550
551        let report = OrderStatusReport::new(
552            AccountId::from("SIM-001"),
553            InstrumentId::from("EUR/USD.SIM"),
554            Some(ClientOrderId::from("O-001")),
555            VenueOrderId::from("V-001"),
556            OrderSide::Buy,
557            OrderType::Market,
558            TimeInForce::Gtc,
559            OrderStatus::Accepted,
560            Quantity::from(100_000),
561            Quantity::from(100_000),
562            UnixNanos::from(1),
563            UnixNanos::from(2),
564            UnixNanos::from(3),
565            None,
566        );
567
568        tx.send(ExecutionEvent::Report(ExecutionReport::OrderStatus(
569            Box::new(report),
570        )))
571        .unwrap();
572
573        let received = rx.recv().await.unwrap();
574        match received {
575            ExecutionEvent::Report(ExecutionReport::OrderStatus(r)) => {
576                assert_eq!(r.venue_order_id.as_str(), "V-001");
577                assert_eq!(r.order_status, OrderStatus::Accepted);
578            }
579            _ => panic!("Expected OrderStatusReport"),
580        }
581    }
582}