nautilus_live/
runner.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{fmt::Debug, sync::Arc};
17
18use nautilus_common::{
19    messages::{
20        DataEvent, ExecutionEvent, ExecutionReport, data::DataCommand, execution::TradingCommand,
21    },
22    msgbus::{self, switchboard::MessagingSwitchboard},
23    runner::{
24        DataCommandSender, TimeEventSender, TradingCommandSender, set_data_cmd_sender,
25        set_data_event_sender, set_exec_cmd_sender, set_exec_event_sender, set_time_event_sender,
26    },
27    timer::TimeEventHandlerV2,
28};
29use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
30
31/// Asynchronous implementation of `DataCommandSender` for live environments.
32#[derive(Debug)]
33pub struct AsyncDataCommandSender {
34    cmd_tx: UnboundedSender<DataCommand>,
35}
36
37impl AsyncDataCommandSender {
38    #[must_use]
39    pub const fn new(cmd_tx: UnboundedSender<DataCommand>) -> Self {
40        Self { cmd_tx }
41    }
42}
43
44impl DataCommandSender for AsyncDataCommandSender {
45    fn execute(&self, command: DataCommand) {
46        if let Err(e) = self.cmd_tx.send(command) {
47            log::error!("Failed to send data command: {e}");
48        }
49    }
50}
51
52/// Asynchronous implementation of `TimeEventSender` for live environments.
53#[derive(Debug, Clone)]
54pub struct AsyncTimeEventSender {
55    time_tx: UnboundedSender<TimeEventHandlerV2>,
56}
57
58impl AsyncTimeEventSender {
59    #[must_use]
60    pub const fn new(time_tx: UnboundedSender<TimeEventHandlerV2>) -> Self {
61        Self { time_tx }
62    }
63
64    /// Gets a clone of the underlying channel sender for async use.
65    ///
66    /// This allows async contexts to get a direct channel sender that
67    /// can be moved into async tasks without `RefCell` borrowing issues.
68    #[must_use]
69    pub fn get_channel_sender(&self) -> UnboundedSender<TimeEventHandlerV2> {
70        self.time_tx.clone()
71    }
72}
73
74impl TimeEventSender for AsyncTimeEventSender {
75    fn send(&self, handler: TimeEventHandlerV2) {
76        if let Err(e) = self.time_tx.send(handler) {
77            log::error!("Failed to send time event handler: {e}");
78        }
79    }
80}
81
82/// Asynchronous implementation of `TradingCommandSender` for live environments.
83#[derive(Debug)]
84pub struct AsyncTradingCommandSender {
85    cmd_tx: UnboundedSender<TradingCommand>,
86}
87
88impl AsyncTradingCommandSender {
89    #[must_use]
90    pub const fn new(cmd_tx: UnboundedSender<TradingCommand>) -> Self {
91        Self { cmd_tx }
92    }
93}
94
95impl TradingCommandSender for AsyncTradingCommandSender {
96    fn execute(&self, command: TradingCommand) {
97        if let Err(e) = self.cmd_tx.send(command) {
98            log::error!("Failed to send trading command: {e}");
99        }
100    }
101}
102
103pub trait Runner {
104    fn run(&mut self);
105}
106
107pub struct AsyncRunner {
108    time_evt_rx: UnboundedReceiver<TimeEventHandlerV2>,
109    data_evt_rx: UnboundedReceiver<DataEvent>,
110    data_cmd_rx: UnboundedReceiver<DataCommand>,
111    exec_evt_rx: UnboundedReceiver<ExecutionEvent>,
112    exec_cmd_rx: UnboundedReceiver<TradingCommand>,
113    signal_rx: UnboundedReceiver<()>,
114    signal_tx: UnboundedSender<()>,
115}
116
117impl Default for AsyncRunner {
118    fn default() -> Self {
119        Self::new()
120    }
121}
122
123impl Debug for AsyncRunner {
124    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125        f.debug_struct(stringify!(AsyncRunner)).finish()
126    }
127}
128
129impl AsyncRunner {
130    #[must_use]
131    pub fn new() -> Self {
132        use tokio::sync::mpsc::unbounded_channel; // Inlined for readability
133
134        let (time_evt_tx, time_evt_rx) = unbounded_channel::<TimeEventHandlerV2>();
135        let (data_cmd_tx, data_cmd_rx) = unbounded_channel::<DataCommand>();
136        let (data_evt_tx, data_evt_rx) = unbounded_channel::<DataEvent>();
137        let (exec_cmd_tx, exec_cmd_rx) = unbounded_channel::<TradingCommand>();
138        let (exec_evt_tx, exec_evt_rx) = unbounded_channel::<ExecutionEvent>();
139        let (signal_tx, signal_rx) = unbounded_channel::<()>();
140
141        set_time_event_sender(Arc::new(AsyncTimeEventSender::new(time_evt_tx)));
142        set_data_cmd_sender(Arc::new(AsyncDataCommandSender::new(data_cmd_tx)));
143        set_data_event_sender(data_evt_tx);
144        set_exec_cmd_sender(Arc::new(AsyncTradingCommandSender::new(exec_cmd_tx)));
145        set_exec_event_sender(exec_evt_tx);
146
147        Self {
148            time_evt_rx,
149            data_evt_rx,
150            data_cmd_rx,
151            exec_evt_rx,
152            exec_cmd_rx,
153            signal_rx,
154            signal_tx,
155        }
156    }
157
158    /// Stops the runner with an internal shutdown signal.
159    pub fn stop(&self) {
160        if let Err(e) = self.signal_tx.send(()) {
161            log::error!("Failed to send shutdown signal: {e}");
162        }
163    }
164}
165
166impl AsyncRunner {
167    /// Runs the async runner event loop.
168    ///
169    /// This method processes data events, time events, execution events, and signal events in an async loop.
170    /// It will run until a signal is received or the event streams are closed.
171    pub async fn run(&mut self) {
172        log::info!("Starting AsyncRunner");
173
174        loop {
175            tokio::select! {
176                Some(handler) = self.time_evt_rx.recv() => {
177                    Self::handle_time_event(handler);
178                },
179                Some(cmd) = self.data_cmd_rx.recv() => {
180                    Self::handle_data_command(cmd);
181                },
182                Some(evt) = self.data_evt_rx.recv() => {
183                    Self::handle_data_event(evt);
184                },
185                Some(cmd) = self.exec_cmd_rx.recv() => {
186                    Self::handle_exec_command(cmd);
187                },
188                Some(evt) = self.exec_evt_rx.recv() => {
189                    Self::handle_exec_event(evt);
190                },
191                Some(()) = self.signal_rx.recv() => {
192                    tracing::info!("AsyncRunner received signal, shutting down");
193                    return; // Signal to stop
194                },
195                else => return, // Sentinel event ends run
196            };
197        }
198    }
199
200    #[inline]
201    fn handle_time_event(handler: TimeEventHandlerV2) {
202        handler.run();
203    }
204
205    #[inline]
206    fn handle_data_command(cmd: DataCommand) {
207        msgbus::send_any(MessagingSwitchboard::data_engine_execute(), &cmd);
208    }
209
210    #[inline]
211    fn handle_data_event(event: DataEvent) {
212        match event {
213            DataEvent::Data(data) => {
214                msgbus::send_any(MessagingSwitchboard::data_engine_process(), &data);
215            }
216            DataEvent::Response(resp) => {
217                msgbus::send_any(MessagingSwitchboard::data_engine_response(), &resp);
218            }
219            #[cfg(feature = "defi")]
220            DataEvent::DeFi(data) => {
221                msgbus::send_any(MessagingSwitchboard::data_engine_process(), &data);
222            }
223        }
224    }
225
226    #[inline]
227    fn handle_exec_command(cmd: TradingCommand) {
228        msgbus::send_any(MessagingSwitchboard::exec_engine_execute(), &cmd);
229    }
230
231    #[inline]
232    fn handle_exec_event(event: ExecutionEvent) {
233        match event {
234            ExecutionEvent::Order(order_event) => {
235                msgbus::send_any(MessagingSwitchboard::exec_engine_process(), &order_event);
236            }
237            ExecutionEvent::Report(report) => {
238                Self::handle_exec_report(report);
239            }
240            ExecutionEvent::Account(account) => {
241                msgbus::send_any(MessagingSwitchboard::portfolio_update_account(), &account);
242            }
243        }
244    }
245
246    #[inline]
247    fn handle_exec_report(report: ExecutionReport) {
248        match report {
249            ExecutionReport::OrderStatus(r) => {
250                msgbus::send_any(
251                    MessagingSwitchboard::exec_engine_reconcile_execution_report(),
252                    &*r,
253                );
254            }
255            ExecutionReport::Fill(r) => {
256                msgbus::send_any(
257                    MessagingSwitchboard::exec_engine_reconcile_execution_report(),
258                    &*r,
259                );
260            }
261            ExecutionReport::Position(r) => {
262                msgbus::send_any(
263                    MessagingSwitchboard::exec_engine_reconcile_execution_report(),
264                    &*r,
265                );
266            }
267            ExecutionReport::Mass(r) => {
268                msgbus::send_any(
269                    MessagingSwitchboard::exec_engine_reconcile_execution_mass_status(),
270                    &*r,
271                );
272            }
273        }
274    }
275}
276
277////////////////////////////////////////////////////////////////////////////////
278// Tests
279////////////////////////////////////////////////////////////////////////////////
280
281#[cfg(test)]
282mod tests {
283    use std::time::Duration;
284
285    use nautilus_common::{
286        messages::{
287            ExecutionEvent, ExecutionReport,
288            data::{SubscribeCommand, SubscribeCustomData},
289            execution::TradingCommand,
290        },
291        timer::{TimeEvent, TimeEventCallback, TimeEventHandlerV2},
292    };
293    use nautilus_core::{UUID4, UnixNanos};
294    use nautilus_model::{
295        data::{Data, DataType, quote::QuoteTick},
296        enums::{
297            AccountType, LiquiditySide, OrderSide, OrderStatus, OrderType, PositionSideSpecified,
298            TimeInForce,
299        },
300        events::{OrderEvent, OrderEventAny, OrderSubmitted, account::state::AccountState},
301        identifiers::{
302            AccountId, ClientId, ClientOrderId, InstrumentId, PositionId, StrategyId, TradeId,
303            TraderId, VenueOrderId,
304        },
305        reports::{FillReport, OrderStatusReport, PositionStatusReport},
306        types::{Money, Price, Quantity},
307    };
308    use rstest::rstest;
309    use ustr::Ustr;
310
311    use super::*;
312
313    // Test fixture for creating test quotes
314    fn test_quote() -> QuoteTick {
315        QuoteTick {
316            instrument_id: InstrumentId::from("EUR/USD.SIM"),
317            bid_price: Price::from("1.10000"),
318            ask_price: Price::from("1.10001"),
319            bid_size: Quantity::from(1_000_000),
320            ask_size: Quantity::from(1_000_000),
321            ts_event: UnixNanos::default(),
322            ts_init: UnixNanos::default(),
323        }
324    }
325
326    #[rstest]
327    fn test_async_data_command_sender_creation() {
328        let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
329        let sender = AsyncDataCommandSender::new(tx);
330        assert!(format!("{sender:?}").contains("AsyncDataCommandSender"));
331    }
332
333    #[rstest]
334    fn test_async_time_event_sender_creation() {
335        let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
336        let sender = AsyncTimeEventSender::new(tx);
337        assert!(format!("{sender:?}").contains("AsyncTimeEventSender"));
338    }
339
340    #[rstest]
341    fn test_async_time_event_sender_get_channel() {
342        let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
343        let sender = AsyncTimeEventSender::new(tx);
344        let channel = sender.get_channel_sender();
345
346        // Verify the channel is functional
347        let event = TimeEvent::new(
348            Ustr::from("test"),
349            UUID4::new(),
350            UnixNanos::from(1),
351            UnixNanos::from(2),
352        );
353        let callback = TimeEventCallback::from(|_: TimeEvent| {});
354        let handler = TimeEventHandlerV2::new(event, callback);
355
356        assert!(channel.send(handler).is_ok());
357    }
358
359    #[tokio::test]
360    async fn test_async_data_command_sender_execute() {
361        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
362        let sender = AsyncDataCommandSender::new(tx);
363
364        let command = DataCommand::Subscribe(SubscribeCommand::Data(SubscribeCustomData {
365            client_id: Some(ClientId::from("TEST")),
366            venue: None,
367            data_type: DataType::new("QuoteTick", None),
368            command_id: UUID4::new(),
369            ts_init: UnixNanos::default(),
370            params: None,
371        }));
372
373        sender.execute(command.clone());
374
375        let received = rx.recv().await.unwrap();
376        match (received, command) {
377            (
378                DataCommand::Subscribe(SubscribeCommand::Data(r)),
379                DataCommand::Subscribe(SubscribeCommand::Data(c)),
380            ) => {
381                assert_eq!(r.client_id, c.client_id);
382                assert_eq!(r.data_type, c.data_type);
383            }
384            _ => panic!("Command mismatch"),
385        }
386    }
387
388    #[tokio::test]
389    async fn test_async_time_event_sender_send() {
390        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
391        let sender = AsyncTimeEventSender::new(tx);
392
393        let event = TimeEvent::new(
394            Ustr::from("test"),
395            UUID4::new(),
396            UnixNanos::from(1),
397            UnixNanos::from(2),
398        );
399        let callback = TimeEventCallback::from(|_: TimeEvent| {});
400        let handler = TimeEventHandlerV2::new(event, callback);
401
402        sender.send(handler);
403
404        assert!(rx.recv().await.is_some());
405    }
406
407    #[tokio::test]
408    async fn test_runner_shutdown_signal() {
409        // Create runner with manual channels to avoid global state
410        let (_data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
411        let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
412        let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
413        let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
414        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
415        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
416
417        let mut runner = AsyncRunner {
418            data_evt_rx,
419            data_cmd_rx,
420            time_evt_rx,
421            exec_evt_rx,
422            exec_cmd_rx,
423            signal_rx,
424            signal_tx: signal_tx.clone(),
425        };
426
427        // Start runner
428        let runner_handle = tokio::spawn(async move {
429            runner.run().await;
430        });
431
432        // Send shutdown signal
433        signal_tx.send(()).unwrap();
434
435        // Runner should stop quickly
436        let result = tokio::time::timeout(Duration::from_millis(100), runner_handle).await;
437        assert!(result.is_ok(), "Runner should stop on signal");
438    }
439
440    #[tokio::test]
441    async fn test_runner_closes_on_channel_drop() {
442        let (data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
443        let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
444        let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
445        let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
446        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
447        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
448
449        let mut runner = AsyncRunner {
450            data_evt_rx,
451            data_cmd_rx,
452            time_evt_rx,
453            exec_evt_rx,
454            exec_cmd_rx,
455            signal_rx,
456            signal_tx: signal_tx.clone(),
457        };
458
459        // Start runner
460        let runner_handle = tokio::spawn(async move {
461            runner.run().await;
462        });
463
464        // Drop data sender to close channel - this should cause runner to exit
465        drop(data_tx);
466
467        // Send stop signal to ensure clean shutdown
468        tokio::time::sleep(Duration::from_millis(50)).await;
469        signal_tx.send(()).ok();
470
471        // Runner should stop when channels close or on signal
472        let result = tokio::time::timeout(Duration::from_millis(200), runner_handle).await;
473        assert!(
474            result.is_ok(),
475            "Runner should stop when channels close or on signal"
476        );
477    }
478
479    #[tokio::test]
480    async fn test_concurrent_event_sending() {
481        let (data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
482        let (_data_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
483        let (_time_evt_tx, time_evt_rx) =
484            tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
485        let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
486        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
487        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
488
489        // Setup runner
490        let mut runner = AsyncRunner {
491            time_evt_rx,
492            data_evt_rx,
493            data_cmd_rx,
494            exec_evt_rx,
495            exec_cmd_rx,
496            signal_rx,
497            signal_tx: signal_tx.clone(),
498        };
499
500        // Spawn multiple concurrent senders
501        let mut handles = vec![];
502        for _ in 0..5 {
503            let tx_clone = data_evt_tx.clone();
504            let handle = tokio::spawn(async move {
505                for _ in 0..20 {
506                    let quote = test_quote();
507                    tx_clone.send(DataEvent::Data(Data::Quote(quote))).unwrap();
508                    tokio::task::yield_now().await;
509                }
510            });
511            handles.push(handle);
512        }
513
514        // Start runner in background
515        let runner_handle = tokio::spawn(async move {
516            runner.run().await;
517        });
518
519        // Wait for all senders
520        for handle in handles {
521            handle.await.unwrap();
522        }
523
524        // Give runner time to process
525        tokio::time::sleep(Duration::from_millis(50)).await;
526
527        // Stop runner
528        signal_tx.send(()).unwrap();
529
530        let _ = tokio::time::timeout(Duration::from_secs(1), runner_handle).await;
531    }
532
533    #[rstest]
534    #[case(10)]
535    #[case(100)]
536    #[case(1000)]
537    fn test_channel_send_performance(#[case] count: usize) {
538        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
539        let quote = test_quote();
540
541        // Send events
542        for _ in 0..count {
543            tx.send(DataEvent::Data(Data::Quote(quote))).unwrap();
544        }
545
546        // Verify all received
547        let mut received = 0;
548        while rx.try_recv().is_ok() {
549            received += 1;
550        }
551
552        assert_eq!(received, count);
553    }
554
555    #[rstest]
556    fn test_async_trading_command_sender_creation() {
557        let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
558        let sender = AsyncTradingCommandSender::new(tx);
559        assert!(format!("{sender:?}").contains("AsyncTradingCommandSender"));
560    }
561
562    #[tokio::test]
563    async fn test_execution_event_order_channel() {
564        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
565
566        let event = OrderSubmitted::new(
567            TraderId::from("TRADER-001"),
568            StrategyId::from("S-001"),
569            InstrumentId::from("EUR/USD.SIM"),
570            ClientOrderId::from("O-001"),
571            AccountId::from("SIM-001"),
572            UUID4::new(),
573            UnixNanos::from(1),
574            UnixNanos::from(2),
575        );
576
577        tx.send(ExecutionEvent::Order(OrderEventAny::Submitted(event)))
578            .unwrap();
579
580        let received = rx.recv().await.unwrap();
581        match received {
582            ExecutionEvent::Order(OrderEventAny::Submitted(e)) => {
583                assert_eq!(e.client_order_id(), ClientOrderId::from("O-001"));
584            }
585            _ => panic!("Expected OrderSubmitted event"),
586        }
587    }
588
589    #[tokio::test]
590    async fn test_execution_report_order_status_channel() {
591        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
592
593        let report = OrderStatusReport::new(
594            AccountId::from("SIM-001"),
595            InstrumentId::from("EUR/USD.SIM"),
596            Some(ClientOrderId::from("O-001")),
597            VenueOrderId::from("V-001"),
598            OrderSide::Buy,
599            OrderType::Market,
600            TimeInForce::Gtc,
601            OrderStatus::Accepted,
602            Quantity::from(100_000),
603            Quantity::from(100_000),
604            UnixNanos::from(1),
605            UnixNanos::from(2),
606            UnixNanos::from(3),
607            None,
608        );
609
610        tx.send(ExecutionEvent::Report(ExecutionReport::OrderStatus(
611            Box::new(report),
612        )))
613        .unwrap();
614
615        let received = rx.recv().await.unwrap();
616        match received {
617            ExecutionEvent::Report(ExecutionReport::OrderStatus(r)) => {
618                assert_eq!(r.venue_order_id.as_str(), "V-001");
619                assert_eq!(r.order_status, OrderStatus::Accepted);
620            }
621            _ => panic!("Expected OrderStatusReport"),
622        }
623    }
624
625    #[tokio::test]
626    async fn test_execution_report_fill() {
627        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
628
629        let report = FillReport::new(
630            AccountId::from("SIM-001"),
631            InstrumentId::from("EUR/USD.SIM"),
632            VenueOrderId::from("V-001"),
633            TradeId::from("T-001"),
634            OrderSide::Buy,
635            Quantity::from(100_000),
636            Price::from("1.10000"),
637            Money::from("10 USD"),
638            LiquiditySide::Taker,
639            Some(ClientOrderId::from("O-001")),
640            None,
641            UnixNanos::from(1),
642            UnixNanos::from(2),
643            None,
644        );
645
646        tx.send(ExecutionEvent::Report(ExecutionReport::Fill(Box::new(
647            report,
648        ))))
649        .unwrap();
650
651        let received = rx.recv().await.unwrap();
652        match received {
653            ExecutionEvent::Report(ExecutionReport::Fill(r)) => {
654                assert_eq!(r.venue_order_id.as_str(), "V-001");
655                assert_eq!(r.trade_id.to_string(), "T-001");
656            }
657            _ => panic!("Expected FillReport"),
658        }
659    }
660
661    #[tokio::test]
662    async fn test_execution_report_position() {
663        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
664
665        let report = PositionStatusReport::new(
666            AccountId::from("SIM-001"),
667            InstrumentId::from("EUR/USD.SIM"),
668            PositionSideSpecified::Long,
669            Quantity::from(100_000),
670            UnixNanos::from(1),
671            UnixNanos::from(2),
672            None,
673            Some(PositionId::from("P-001")),
674            None,
675        );
676
677        tx.send(ExecutionEvent::Report(ExecutionReport::Position(Box::new(
678            report,
679        ))))
680        .unwrap();
681
682        let received = rx.recv().await.unwrap();
683        match received {
684            ExecutionEvent::Report(ExecutionReport::Position(r)) => {
685                assert_eq!(r.venue_position_id.unwrap().as_str(), "P-001");
686            }
687            _ => panic!("Expected PositionStatusReport"),
688        }
689    }
690
691    #[tokio::test]
692    async fn test_execution_event_account() {
693        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
694
695        let account_state = AccountState::new(
696            AccountId::from("SIM-001"),
697            AccountType::Cash,
698            vec![],
699            vec![],
700            true,
701            UUID4::new(),
702            UnixNanos::from(1),
703            UnixNanos::from(2),
704            None,
705        );
706
707        tx.send(ExecutionEvent::Account(account_state)).unwrap();
708
709        let received = rx.recv().await.unwrap();
710        match received {
711            ExecutionEvent::Account(r) => {
712                assert_eq!(r.account_id.as_str(), "SIM-001");
713            }
714            _ => panic!("Expected AccountState"),
715        }
716    }
717
718    #[tokio::test]
719    async fn test_runner_stop_method() {
720        let (_data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
721        let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
722        let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
723        let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
724        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
725        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
726
727        let mut runner = AsyncRunner {
728            data_evt_rx,
729            data_cmd_rx,
730            time_evt_rx,
731            exec_evt_rx,
732            exec_cmd_rx,
733            signal_rx,
734            signal_tx: signal_tx.clone(),
735        };
736
737        let runner_handle = tokio::spawn(async move {
738            runner.run().await;
739        });
740
741        // Use stop method instead of sending signal directly
742        let stopper = AsyncRunner {
743            data_evt_rx: tokio::sync::mpsc::unbounded_channel::<DataEvent>().1,
744            data_cmd_rx: tokio::sync::mpsc::unbounded_channel::<DataCommand>().1,
745            time_evt_rx: tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>().1,
746            exec_evt_rx: tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>().1,
747            exec_cmd_rx: tokio::sync::mpsc::unbounded_channel::<TradingCommand>().1,
748            signal_rx: tokio::sync::mpsc::unbounded_channel::<()>().1,
749            signal_tx,
750        };
751
752        stopper.stop();
753
754        let result = tokio::time::timeout(Duration::from_millis(100), runner_handle).await;
755        assert!(result.is_ok(), "Runner should stop when stop() is called");
756    }
757
758    #[tokio::test]
759    async fn test_all_event_types_integration() {
760        let (data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
761        let (data_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
762        let (time_evt_tx, time_evt_rx) =
763            tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
764        let (exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
765        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
766        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
767
768        let mut runner = AsyncRunner {
769            time_evt_rx,
770            data_evt_rx,
771            data_cmd_rx,
772            exec_evt_rx,
773            exec_cmd_rx,
774            signal_rx,
775            signal_tx: signal_tx.clone(),
776        };
777
778        let runner_handle = tokio::spawn(async move {
779            runner.run().await;
780        });
781
782        // Send data event
783        let quote = test_quote();
784        data_evt_tx
785            .send(DataEvent::Data(Data::Quote(quote)))
786            .unwrap();
787
788        // Send data command
789        let command = DataCommand::Subscribe(SubscribeCommand::Data(SubscribeCustomData {
790            client_id: Some(ClientId::from("TEST")),
791            venue: None,
792            data_type: nautilus_model::data::DataType::new("QuoteTick", None),
793            command_id: UUID4::new(),
794            ts_init: UnixNanos::default(),
795            params: None,
796        }));
797        data_cmd_tx.send(command).unwrap();
798
799        // Send time event
800        let event = TimeEvent::new(
801            Ustr::from("test"),
802            UUID4::new(),
803            UnixNanos::from(1),
804            UnixNanos::from(2),
805        );
806        let callback = TimeEventCallback::from(|_: TimeEvent| {});
807        let handler = TimeEventHandlerV2::new(event, callback);
808        time_evt_tx.send(handler).unwrap();
809
810        // Send execution order event
811        let order_event = OrderSubmitted::new(
812            TraderId::from("TRADER-001"),
813            StrategyId::from("S-001"),
814            InstrumentId::from("EUR/USD.SIM"),
815            ClientOrderId::from("O-001"),
816            AccountId::from("SIM-001"),
817            UUID4::new(),
818            UnixNanos::from(1),
819            UnixNanos::from(2),
820        );
821        exec_evt_tx
822            .send(ExecutionEvent::Order(
823                nautilus_model::events::OrderEventAny::Submitted(order_event),
824            ))
825            .unwrap();
826
827        // Send execution report (OrderStatus)
828        let order_status = OrderStatusReport::new(
829            AccountId::from("SIM-001"),
830            InstrumentId::from("EUR/USD.SIM"),
831            Some(ClientOrderId::from("O-001")),
832            VenueOrderId::from("V-001"),
833            OrderSide::Buy,
834            OrderType::Market,
835            TimeInForce::Gtc,
836            OrderStatus::Accepted,
837            Quantity::from(100_000),
838            Quantity::from(100_000),
839            UnixNanos::from(1),
840            UnixNanos::from(2),
841            UnixNanos::from(3),
842            None,
843        );
844        exec_evt_tx
845            .send(ExecutionEvent::Report(ExecutionReport::OrderStatus(
846                Box::new(order_status),
847            )))
848            .unwrap();
849
850        // Send execution report (Fill)
851        let fill = FillReport::new(
852            AccountId::from("SIM-001"),
853            InstrumentId::from("EUR/USD.SIM"),
854            VenueOrderId::from("V-001"),
855            TradeId::from("T-001"),
856            OrderSide::Buy,
857            Quantity::from(100_000),
858            Price::from("1.10000"),
859            Money::from("10 USD"),
860            LiquiditySide::Taker,
861            Some(ClientOrderId::from("O-001")),
862            None,
863            UnixNanos::from(1),
864            UnixNanos::from(2),
865            None,
866        );
867        exec_evt_tx
868            .send(ExecutionEvent::Report(ExecutionReport::Fill(Box::new(
869                fill,
870            ))))
871            .unwrap();
872
873        // Send execution report (Position)
874        let position = PositionStatusReport::new(
875            AccountId::from("SIM-001"),
876            InstrumentId::from("EUR/USD.SIM"),
877            PositionSideSpecified::Long,
878            Quantity::from(100_000),
879            UnixNanos::from(1),
880            UnixNanos::from(2),
881            None,
882            Some(PositionId::from("P-001")),
883            None,
884        );
885        exec_evt_tx
886            .send(ExecutionEvent::Report(ExecutionReport::Position(Box::new(
887                position,
888            ))))
889            .unwrap();
890
891        // Send account event
892        let account_state = AccountState::new(
893            AccountId::from("SIM-001"),
894            AccountType::Cash,
895            vec![],
896            vec![],
897            true,
898            UUID4::new(),
899            UnixNanos::from(1),
900            UnixNanos::from(2),
901            None,
902        );
903        exec_evt_tx
904            .send(ExecutionEvent::Account(account_state))
905            .unwrap();
906
907        // Give runner time to process all events
908        tokio::time::sleep(Duration::from_millis(100)).await;
909
910        // Stop runner
911        signal_tx.send(()).unwrap();
912
913        let result = tokio::time::timeout(Duration::from_secs(1), runner_handle).await;
914        assert!(
915            result.is_ok(),
916            "Runner should process all event types and stop cleanly"
917        );
918    }
919}