nautilus_live/
runner.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{fmt::Debug, sync::Arc};
17
18use nautilus_common::{
19    live::runner::{set_data_event_sender, set_exec_event_sender},
20    messages::{
21        DataEvent, ExecutionEvent, ExecutionReport, data::DataCommand, execution::TradingCommand,
22    },
23    msgbus::{self, switchboard::MessagingSwitchboard},
24    runner::{
25        DataCommandSender, TimeEventSender, TradingCommandSender, set_data_cmd_sender,
26        set_exec_cmd_sender, set_time_event_sender,
27    },
28    timer::TimeEventHandlerV2,
29};
30
31/// Asynchronous implementation of `DataCommandSender` for live environments.
32#[derive(Debug)]
33pub struct AsyncDataCommandSender {
34    cmd_tx: tokio::sync::mpsc::UnboundedSender<DataCommand>,
35}
36
37impl AsyncDataCommandSender {
38    #[must_use]
39    pub const fn new(cmd_tx: tokio::sync::mpsc::UnboundedSender<DataCommand>) -> Self {
40        Self { cmd_tx }
41    }
42}
43
44impl DataCommandSender for AsyncDataCommandSender {
45    fn execute(&self, command: DataCommand) {
46        if let Err(e) = self.cmd_tx.send(command) {
47            log::error!("Failed to send data command: {e}");
48        }
49    }
50}
51
52/// Asynchronous implementation of `TimeEventSender` for live environments.
53#[derive(Debug, Clone)]
54pub struct AsyncTimeEventSender {
55    time_tx: tokio::sync::mpsc::UnboundedSender<TimeEventHandlerV2>,
56}
57
58impl AsyncTimeEventSender {
59    #[must_use]
60    pub const fn new(time_tx: tokio::sync::mpsc::UnboundedSender<TimeEventHandlerV2>) -> Self {
61        Self { time_tx }
62    }
63
64    /// Gets a clone of the underlying channel sender for async use.
65    ///
66    /// This allows async contexts to get a direct channel sender that
67    /// can be moved into async tasks without `RefCell` borrowing issues.
68    #[must_use]
69    pub fn get_channel_sender(&self) -> tokio::sync::mpsc::UnboundedSender<TimeEventHandlerV2> {
70        self.time_tx.clone()
71    }
72}
73
74impl TimeEventSender for AsyncTimeEventSender {
75    fn send(&self, handler: TimeEventHandlerV2) {
76        if let Err(e) = self.time_tx.send(handler) {
77            log::error!("Failed to send time event handler: {e}");
78        }
79    }
80}
81
82/// Asynchronous implementation of `TradingCommandSender` for live environments.
83#[derive(Debug)]
84pub struct AsyncTradingCommandSender {
85    cmd_tx: tokio::sync::mpsc::UnboundedSender<TradingCommand>,
86}
87
88impl AsyncTradingCommandSender {
89    #[must_use]
90    pub const fn new(cmd_tx: tokio::sync::mpsc::UnboundedSender<TradingCommand>) -> Self {
91        Self { cmd_tx }
92    }
93}
94
95impl TradingCommandSender for AsyncTradingCommandSender {
96    fn execute(&self, command: TradingCommand) {
97        if let Err(e) = self.cmd_tx.send(command) {
98            log::error!("Failed to send trading command: {e}");
99        }
100    }
101}
102
103pub trait Runner {
104    fn run(&mut self);
105}
106
107/// Channel receivers for the async event loop.
108///
109/// These can be extracted from `AsyncRunner` via `take_channels()` to drive
110/// the event loop directly on the same thread as the msgbus endpoints.
111#[derive(Debug)]
112pub struct AsyncRunnerChannels {
113    pub time_evt_rx: tokio::sync::mpsc::UnboundedReceiver<TimeEventHandlerV2>,
114    pub data_evt_rx: tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
115    pub data_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
116    pub exec_evt_rx: tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
117    pub exec_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<TradingCommand>,
118}
119
120pub struct AsyncRunner {
121    channels: AsyncRunnerChannels,
122    signal_rx: tokio::sync::mpsc::UnboundedReceiver<()>,
123    signal_tx: tokio::sync::mpsc::UnboundedSender<()>,
124}
125
126/// Handle for stopping the AsyncRunner from another context.
127#[derive(Clone, Debug)]
128pub struct AsyncRunnerHandle {
129    signal_tx: tokio::sync::mpsc::UnboundedSender<()>,
130}
131
132impl AsyncRunnerHandle {
133    /// Signals the runner to stop.
134    pub fn stop(&self) {
135        if let Err(e) = self.signal_tx.send(()) {
136            log::error!("Failed to send shutdown signal: {e}");
137        }
138    }
139}
140
141impl Default for AsyncRunner {
142    fn default() -> Self {
143        Self::new()
144    }
145}
146
147impl Debug for AsyncRunner {
148    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149        f.debug_struct(stringify!(AsyncRunner)).finish()
150    }
151}
152
153impl AsyncRunner {
154    /// Creates a new [`AsyncRunner`] instance.
155    #[must_use]
156    pub fn new() -> Self {
157        use tokio::sync::mpsc::unbounded_channel; // tokio-import-ok
158
159        let (time_evt_tx, time_evt_rx) = unbounded_channel::<TimeEventHandlerV2>();
160        let (data_cmd_tx, data_cmd_rx) = unbounded_channel::<DataCommand>();
161        let (data_evt_tx, data_evt_rx) = unbounded_channel::<DataEvent>();
162        let (exec_cmd_tx, exec_cmd_rx) = unbounded_channel::<TradingCommand>();
163        let (exec_evt_tx, exec_evt_rx) = unbounded_channel::<ExecutionEvent>();
164        let (signal_tx, signal_rx) = unbounded_channel::<()>();
165
166        set_time_event_sender(Arc::new(AsyncTimeEventSender::new(time_evt_tx)));
167        set_data_cmd_sender(Arc::new(AsyncDataCommandSender::new(data_cmd_tx)));
168        set_data_event_sender(data_evt_tx);
169        set_exec_cmd_sender(Arc::new(AsyncTradingCommandSender::new(exec_cmd_tx)));
170        set_exec_event_sender(exec_evt_tx);
171
172        Self {
173            channels: AsyncRunnerChannels {
174                time_evt_rx,
175                data_evt_rx,
176                data_cmd_rx,
177                exec_evt_rx,
178                exec_cmd_rx,
179            },
180            signal_rx,
181            signal_tx,
182        }
183    }
184
185    /// Stops the runner with an internal shutdown signal.
186    pub fn stop(&self) {
187        if let Err(e) = self.signal_tx.send(()) {
188            log::error!("Failed to send shutdown signal: {e}");
189        }
190    }
191
192    /// Returns a handle that can be used to stop the runner from another context.
193    #[must_use]
194    pub fn handle(&self) -> AsyncRunnerHandle {
195        AsyncRunnerHandle {
196            signal_tx: self.signal_tx.clone(),
197        }
198    }
199
200    /// Consumes the runner and returns the channel receivers for direct event loop driving.
201    ///
202    /// This is used when the event loop needs to run on the same thread as the msgbus
203    /// endpoints (which use thread-local storage).
204    #[must_use]
205    pub fn take_channels(self) -> AsyncRunnerChannels {
206        self.channels
207    }
208}
209
210impl AsyncRunner {
211    /// Runs the async runner event loop.
212    ///
213    /// This method processes data events, time events, execution events, and signal events in an async loop.
214    /// It will run until a signal is received or the event streams are closed.
215    pub async fn run(&mut self) {
216        log::info!("AsyncRunner starting");
217
218        loop {
219            tokio::select! {
220                Some(()) = self.signal_rx.recv() => {
221                    tracing::info!("AsyncRunner received signal, shutting down");
222                    return;
223                },
224                Some(handler) = self.channels.time_evt_rx.recv() => {
225                    Self::handle_time_event(handler);
226                },
227                Some(cmd) = self.channels.data_cmd_rx.recv() => {
228                    Self::handle_data_command(cmd);
229                },
230                Some(evt) = self.channels.data_evt_rx.recv() => {
231                    Self::handle_data_event(evt);
232                },
233                Some(cmd) = self.channels.exec_cmd_rx.recv() => {
234                    Self::handle_exec_command(cmd);
235                },
236                Some(evt) = self.channels.exec_evt_rx.recv() => {
237                    Self::handle_exec_event(evt);
238                },
239                else => {
240                    tracing::debug!("AsyncRunner all channels closed, exiting");
241                    return;
242                }
243            };
244        }
245    }
246
247    /// Handles a time event by running its callback.
248    #[inline]
249    pub fn handle_time_event(handler: TimeEventHandlerV2) {
250        handler.run();
251    }
252
253    /// Handles a data command by sending to the DataEngine.
254    #[inline]
255    pub fn handle_data_command(cmd: DataCommand) {
256        msgbus::send_any(MessagingSwitchboard::data_engine_execute(), &cmd);
257    }
258
259    /// Handles a data event by sending to the appropriate DataEngine endpoint.
260    #[inline]
261    pub fn handle_data_event(event: DataEvent) {
262        match event {
263            DataEvent::Data(data) => {
264                msgbus::send_any(MessagingSwitchboard::data_engine_process(), &data);
265            }
266            DataEvent::Instrument(data) => {
267                msgbus::send_any(MessagingSwitchboard::data_engine_process(), &data);
268            }
269            DataEvent::Response(resp) => {
270                msgbus::send_any(MessagingSwitchboard::data_engine_response(), &resp);
271            }
272            #[cfg(feature = "defi")]
273            DataEvent::DeFi(data) => {
274                msgbus::send_any(MessagingSwitchboard::data_engine_process(), &data);
275            }
276        }
277    }
278
279    /// Handles an execution command by sending to the ExecEngine.
280    #[inline]
281    pub fn handle_exec_command(cmd: TradingCommand) {
282        msgbus::send_any(MessagingSwitchboard::exec_engine_execute(), &cmd);
283    }
284
285    /// Handles an execution event by sending to the appropriate engine endpoint.
286    #[inline]
287    pub fn handle_exec_event(event: ExecutionEvent) {
288        match event {
289            ExecutionEvent::Order(ref order_event) => {
290                msgbus::send_any(MessagingSwitchboard::exec_engine_process(), order_event);
291            }
292            ExecutionEvent::Report(report) => {
293                Self::handle_exec_report(report);
294            }
295            ExecutionEvent::Account(ref account) => {
296                msgbus::send_any(MessagingSwitchboard::portfolio_update_account(), account);
297            }
298        }
299    }
300
301    #[inline]
302    pub fn handle_exec_report(report: ExecutionReport) {
303        match report {
304            ExecutionReport::OrderStatus(r) => {
305                msgbus::send_any(
306                    MessagingSwitchboard::exec_engine_reconcile_execution_report(),
307                    &*r,
308                );
309            }
310            ExecutionReport::Fill(r) => {
311                msgbus::send_any(
312                    MessagingSwitchboard::exec_engine_reconcile_execution_report(),
313                    &*r,
314                );
315            }
316            ExecutionReport::Position(r) => {
317                msgbus::send_any(
318                    MessagingSwitchboard::exec_engine_reconcile_execution_report(),
319                    &*r,
320                );
321            }
322            ExecutionReport::Mass(r) => {
323                msgbus::send_any(
324                    MessagingSwitchboard::exec_engine_reconcile_execution_mass_status(),
325                    &*r,
326                );
327            }
328        }
329    }
330}
331
332#[cfg(test)]
333mod tests {
334    use std::time::Duration;
335
336    use nautilus_common::{
337        messages::{
338            ExecutionEvent, ExecutionReport,
339            data::{SubscribeCommand, SubscribeCustomData},
340            execution::TradingCommand,
341        },
342        timer::{TimeEvent, TimeEventCallback, TimeEventHandlerV2},
343    };
344    use nautilus_core::{UUID4, UnixNanos};
345    use nautilus_model::{
346        data::{Data, DataType, quote::QuoteTick},
347        enums::{
348            AccountType, LiquiditySide, OrderSide, OrderStatus, OrderType, PositionSideSpecified,
349            TimeInForce,
350        },
351        events::{OrderEvent, OrderEventAny, OrderSubmitted, account::state::AccountState},
352        identifiers::{
353            AccountId, ClientId, ClientOrderId, InstrumentId, PositionId, StrategyId, TradeId,
354            TraderId, VenueOrderId,
355        },
356        reports::{FillReport, OrderStatusReport, PositionStatusReport},
357        types::{Money, Price, Quantity},
358    };
359    use rstest::rstest;
360    use ustr::Ustr;
361
362    use super::*;
363
364    // Test fixture for creating test quotes
365    fn test_quote() -> QuoteTick {
366        QuoteTick {
367            instrument_id: InstrumentId::from("EUR/USD.SIM"),
368            bid_price: Price::from("1.10000"),
369            ask_price: Price::from("1.10001"),
370            bid_size: Quantity::from(1_000_000),
371            ask_size: Quantity::from(1_000_000),
372            ts_event: UnixNanos::default(),
373            ts_init: UnixNanos::default(),
374        }
375    }
376
377    // Test helper to create AsyncRunner with manual channels
378    fn create_test_runner(
379        time_evt_rx: tokio::sync::mpsc::UnboundedReceiver<TimeEventHandlerV2>,
380        data_evt_rx: tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
381        data_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
382        exec_evt_rx: tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
383        exec_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<TradingCommand>,
384        signal_rx: tokio::sync::mpsc::UnboundedReceiver<()>,
385        signal_tx: tokio::sync::mpsc::UnboundedSender<()>,
386    ) -> AsyncRunner {
387        AsyncRunner {
388            channels: AsyncRunnerChannels {
389                time_evt_rx,
390                data_evt_rx,
391                data_cmd_rx,
392                exec_evt_rx,
393                exec_cmd_rx,
394            },
395            signal_rx,
396            signal_tx,
397        }
398    }
399
400    #[rstest]
401    fn test_async_data_command_sender_creation() {
402        let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
403        let sender = AsyncDataCommandSender::new(tx);
404        assert!(format!("{sender:?}").contains("AsyncDataCommandSender"));
405    }
406
407    #[rstest]
408    fn test_async_time_event_sender_creation() {
409        let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
410        let sender = AsyncTimeEventSender::new(tx);
411        assert!(format!("{sender:?}").contains("AsyncTimeEventSender"));
412    }
413
414    #[rstest]
415    fn test_async_time_event_sender_get_channel() {
416        let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
417        let sender = AsyncTimeEventSender::new(tx);
418        let channel = sender.get_channel_sender();
419
420        // Verify the channel is functional
421        let event = TimeEvent::new(
422            Ustr::from("test"),
423            UUID4::new(),
424            UnixNanos::from(1),
425            UnixNanos::from(2),
426        );
427        let callback = TimeEventCallback::from(|_: TimeEvent| {});
428        let handler = TimeEventHandlerV2::new(event, callback);
429
430        assert!(channel.send(handler).is_ok());
431    }
432
433    #[tokio::test]
434    async fn test_async_data_command_sender_execute() {
435        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
436        let sender = AsyncDataCommandSender::new(tx);
437
438        let command = DataCommand::Subscribe(SubscribeCommand::Data(SubscribeCustomData {
439            client_id: Some(ClientId::from("TEST")),
440            venue: None,
441            data_type: DataType::new("QuoteTick", None),
442            command_id: UUID4::new(),
443            ts_init: UnixNanos::default(),
444            params: None,
445        }));
446
447        sender.execute(command.clone());
448
449        let received = rx.recv().await.unwrap();
450        match (received, command) {
451            (
452                DataCommand::Subscribe(SubscribeCommand::Data(r)),
453                DataCommand::Subscribe(SubscribeCommand::Data(c)),
454            ) => {
455                assert_eq!(r.client_id, c.client_id);
456                assert_eq!(r.data_type, c.data_type);
457            }
458            _ => panic!("Command mismatch"),
459        }
460    }
461
462    #[tokio::test]
463    async fn test_async_time_event_sender_send() {
464        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
465        let sender = AsyncTimeEventSender::new(tx);
466
467        let event = TimeEvent::new(
468            Ustr::from("test"),
469            UUID4::new(),
470            UnixNanos::from(1),
471            UnixNanos::from(2),
472        );
473        let callback = TimeEventCallback::from(|_: TimeEvent| {});
474        let handler = TimeEventHandlerV2::new(event, callback);
475
476        sender.send(handler);
477
478        assert!(rx.recv().await.is_some());
479    }
480
481    #[tokio::test]
482    async fn test_runner_shutdown_signal() {
483        // Create runner with manual channels to avoid global state
484        let (_data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
485        let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
486        let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
487        let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
488        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
489        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
490
491        let mut runner = create_test_runner(
492            time_evt_rx,
493            data_evt_rx,
494            data_cmd_rx,
495            exec_evt_rx,
496            exec_cmd_rx,
497            signal_rx,
498            signal_tx.clone(),
499        );
500
501        // Start runner
502        let runner_handle = tokio::spawn(async move {
503            runner.run().await;
504        });
505
506        // Send shutdown signal
507        signal_tx.send(()).unwrap();
508
509        // Runner should stop quickly
510        let result = tokio::time::timeout(Duration::from_millis(100), runner_handle).await;
511        assert!(result.is_ok(), "Runner should stop on signal");
512    }
513
514    #[tokio::test]
515    async fn test_runner_closes_on_channel_drop() {
516        let (data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
517        let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
518        let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
519        let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
520        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
521        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
522
523        let mut runner = create_test_runner(
524            time_evt_rx,
525            data_evt_rx,
526            data_cmd_rx,
527            exec_evt_rx,
528            exec_cmd_rx,
529            signal_rx,
530            signal_tx.clone(),
531        );
532
533        // Start runner
534        let runner_handle = tokio::spawn(async move {
535            runner.run().await;
536        });
537
538        // Drop data sender to close channel - this should cause runner to exit
539        drop(data_tx);
540
541        // Send stop signal to ensure clean shutdown
542        tokio::time::sleep(Duration::from_millis(50)).await;
543        signal_tx.send(()).ok();
544
545        // Runner should stop when channels close or on signal
546        let result = tokio::time::timeout(Duration::from_millis(200), runner_handle).await;
547        assert!(
548            result.is_ok(),
549            "Runner should stop when channels close or on signal"
550        );
551    }
552
553    #[tokio::test]
554    async fn test_concurrent_event_sending() {
555        let (data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
556        let (_data_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
557        let (_time_evt_tx, time_evt_rx) =
558            tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
559        let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
560        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
561        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
562
563        // Setup runner
564        let mut runner = create_test_runner(
565            time_evt_rx,
566            data_evt_rx,
567            data_cmd_rx,
568            exec_evt_rx,
569            exec_cmd_rx,
570            signal_rx,
571            signal_tx.clone(),
572        );
573
574        // Spawn multiple concurrent senders
575        let mut handles = vec![];
576        for _ in 0..5 {
577            let tx_clone = data_evt_tx.clone();
578            let handle = tokio::spawn(async move {
579                for _ in 0..20 {
580                    let quote = test_quote();
581                    tx_clone.send(DataEvent::Data(Data::Quote(quote))).unwrap();
582                    tokio::task::yield_now().await;
583                }
584            });
585            handles.push(handle);
586        }
587
588        // Start runner in background
589        let runner_handle = tokio::spawn(async move {
590            runner.run().await;
591        });
592
593        // Wait for all senders
594        for handle in handles {
595            handle.await.unwrap();
596        }
597
598        // Give runner time to process
599        tokio::time::sleep(Duration::from_millis(50)).await;
600
601        // Stop runner
602        signal_tx.send(()).unwrap();
603
604        let _ = tokio::time::timeout(Duration::from_secs(1), runner_handle).await;
605    }
606
607    #[rstest]
608    #[case(10)]
609    #[case(100)]
610    #[case(1000)]
611    fn test_channel_send_performance(#[case] count: usize) {
612        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
613        let quote = test_quote();
614
615        // Send events
616        for _ in 0..count {
617            tx.send(DataEvent::Data(Data::Quote(quote))).unwrap();
618        }
619
620        // Verify all received
621        let mut received = 0;
622        while rx.try_recv().is_ok() {
623            received += 1;
624        }
625
626        assert_eq!(received, count);
627    }
628
629    #[rstest]
630    fn test_async_trading_command_sender_creation() {
631        let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
632        let sender = AsyncTradingCommandSender::new(tx);
633        assert!(format!("{sender:?}").contains("AsyncTradingCommandSender"));
634    }
635
636    #[tokio::test]
637    async fn test_execution_event_order_channel() {
638        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
639
640        let event = OrderSubmitted::new(
641            TraderId::from("TRADER-001"),
642            StrategyId::from("S-001"),
643            InstrumentId::from("EUR/USD.SIM"),
644            ClientOrderId::from("O-001"),
645            AccountId::from("SIM-001"),
646            UUID4::new(),
647            UnixNanos::from(1),
648            UnixNanos::from(2),
649        );
650
651        tx.send(ExecutionEvent::Order(OrderEventAny::Submitted(event)))
652            .unwrap();
653
654        let received = rx.recv().await.unwrap();
655        match received {
656            ExecutionEvent::Order(OrderEventAny::Submitted(e)) => {
657                assert_eq!(e.client_order_id(), ClientOrderId::from("O-001"));
658            }
659            _ => panic!("Expected OrderSubmitted event"),
660        }
661    }
662
663    #[tokio::test]
664    async fn test_execution_report_order_status_channel() {
665        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
666
667        let report = OrderStatusReport::new(
668            AccountId::from("SIM-001"),
669            InstrumentId::from("EUR/USD.SIM"),
670            Some(ClientOrderId::from("O-001")),
671            VenueOrderId::from("V-001"),
672            OrderSide::Buy,
673            OrderType::Market,
674            TimeInForce::Gtc,
675            OrderStatus::Accepted,
676            Quantity::from(100_000),
677            Quantity::from(100_000),
678            UnixNanos::from(1),
679            UnixNanos::from(2),
680            UnixNanos::from(3),
681            None,
682        );
683
684        tx.send(ExecutionEvent::Report(ExecutionReport::OrderStatus(
685            Box::new(report),
686        )))
687        .unwrap();
688
689        let received = rx.recv().await.unwrap();
690        match received {
691            ExecutionEvent::Report(ExecutionReport::OrderStatus(r)) => {
692                assert_eq!(r.venue_order_id.as_str(), "V-001");
693                assert_eq!(r.order_status, OrderStatus::Accepted);
694            }
695            _ => panic!("Expected OrderStatusReport"),
696        }
697    }
698
699    #[tokio::test]
700    async fn test_execution_report_fill() {
701        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
702
703        let report = FillReport::new(
704            AccountId::from("SIM-001"),
705            InstrumentId::from("EUR/USD.SIM"),
706            VenueOrderId::from("V-001"),
707            TradeId::from("T-001"),
708            OrderSide::Buy,
709            Quantity::from(100_000),
710            Price::from("1.10000"),
711            Money::from("10 USD"),
712            LiquiditySide::Taker,
713            Some(ClientOrderId::from("O-001")),
714            None,
715            UnixNanos::from(1),
716            UnixNanos::from(2),
717            None,
718        );
719
720        tx.send(ExecutionEvent::Report(ExecutionReport::Fill(Box::new(
721            report,
722        ))))
723        .unwrap();
724
725        let received = rx.recv().await.unwrap();
726        match received {
727            ExecutionEvent::Report(ExecutionReport::Fill(r)) => {
728                assert_eq!(r.venue_order_id.as_str(), "V-001");
729                assert_eq!(r.trade_id.to_string(), "T-001");
730            }
731            _ => panic!("Expected FillReport"),
732        }
733    }
734
735    #[tokio::test]
736    async fn test_execution_report_position() {
737        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
738
739        let report = PositionStatusReport::new(
740            AccountId::from("SIM-001"),
741            InstrumentId::from("EUR/USD.SIM"),
742            PositionSideSpecified::Long,
743            Quantity::from(100_000),
744            UnixNanos::from(1),
745            UnixNanos::from(2),
746            None,
747            Some(PositionId::from("P-001")),
748            None,
749        );
750
751        tx.send(ExecutionEvent::Report(ExecutionReport::Position(Box::new(
752            report,
753        ))))
754        .unwrap();
755
756        let received = rx.recv().await.unwrap();
757        match received {
758            ExecutionEvent::Report(ExecutionReport::Position(r)) => {
759                assert_eq!(r.venue_position_id.unwrap().as_str(), "P-001");
760            }
761            _ => panic!("Expected PositionStatusReport"),
762        }
763    }
764
765    #[tokio::test]
766    async fn test_execution_event_account() {
767        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
768
769        let account_state = AccountState::new(
770            AccountId::from("SIM-001"),
771            AccountType::Cash,
772            vec![],
773            vec![],
774            true,
775            UUID4::new(),
776            UnixNanos::from(1),
777            UnixNanos::from(2),
778            None,
779        );
780
781        tx.send(ExecutionEvent::Account(account_state)).unwrap();
782
783        let received = rx.recv().await.unwrap();
784        match received {
785            ExecutionEvent::Account(r) => {
786                assert_eq!(r.account_id.as_str(), "SIM-001");
787            }
788            _ => panic!("Expected AccountState"),
789        }
790    }
791
792    #[tokio::test]
793    async fn test_runner_stop_method() {
794        let (_data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
795        let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
796        let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
797        let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
798        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
799        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
800
801        let mut runner = create_test_runner(
802            time_evt_rx,
803            data_evt_rx,
804            data_cmd_rx,
805            exec_evt_rx,
806            exec_cmd_rx,
807            signal_rx,
808            signal_tx.clone(),
809        );
810
811        let runner_handle = tokio::spawn(async move {
812            runner.run().await;
813        });
814
815        // Use stop via signal_tx directly
816        signal_tx.send(()).unwrap();
817
818        let result = tokio::time::timeout(Duration::from_millis(100), runner_handle).await;
819        assert!(result.is_ok(), "Runner should stop when stop() is called");
820    }
821
822    #[tokio::test]
823    async fn test_all_event_types_integration() {
824        let (data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
825        let (data_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
826        let (time_evt_tx, time_evt_rx) =
827            tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
828        let (exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
829        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
830        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
831
832        let mut runner = create_test_runner(
833            time_evt_rx,
834            data_evt_rx,
835            data_cmd_rx,
836            exec_evt_rx,
837            exec_cmd_rx,
838            signal_rx,
839            signal_tx.clone(),
840        );
841
842        let runner_handle = tokio::spawn(async move {
843            runner.run().await;
844        });
845
846        // Send data event
847        let quote = test_quote();
848        data_evt_tx
849            .send(DataEvent::Data(Data::Quote(quote)))
850            .unwrap();
851
852        // Send data command
853        let command = DataCommand::Subscribe(SubscribeCommand::Data(SubscribeCustomData {
854            client_id: Some(ClientId::from("TEST")),
855            venue: None,
856            data_type: DataType::new("QuoteTick", None),
857            command_id: UUID4::new(),
858            ts_init: UnixNanos::default(),
859            params: None,
860        }));
861        data_cmd_tx.send(command).unwrap();
862
863        // Send time event
864        let event = TimeEvent::new(
865            Ustr::from("test"),
866            UUID4::new(),
867            UnixNanos::from(1),
868            UnixNanos::from(2),
869        );
870        let callback = TimeEventCallback::from(|_: TimeEvent| {});
871        let handler = TimeEventHandlerV2::new(event, callback);
872        time_evt_tx.send(handler).unwrap();
873
874        // Send execution order event
875        let order_event = OrderSubmitted::new(
876            TraderId::from("TRADER-001"),
877            StrategyId::from("S-001"),
878            InstrumentId::from("EUR/USD.SIM"),
879            ClientOrderId::from("O-001"),
880            AccountId::from("SIM-001"),
881            UUID4::new(),
882            UnixNanos::from(1),
883            UnixNanos::from(2),
884        );
885        exec_evt_tx
886            .send(ExecutionEvent::Order(OrderEventAny::Submitted(order_event)))
887            .unwrap();
888
889        // Send execution report (OrderStatus)
890        let order_status = OrderStatusReport::new(
891            AccountId::from("SIM-001"),
892            InstrumentId::from("EUR/USD.SIM"),
893            Some(ClientOrderId::from("O-001")),
894            VenueOrderId::from("V-001"),
895            OrderSide::Buy,
896            OrderType::Market,
897            TimeInForce::Gtc,
898            OrderStatus::Accepted,
899            Quantity::from(100_000),
900            Quantity::from(100_000),
901            UnixNanos::from(1),
902            UnixNanos::from(2),
903            UnixNanos::from(3),
904            None,
905        );
906        exec_evt_tx
907            .send(ExecutionEvent::Report(ExecutionReport::OrderStatus(
908                Box::new(order_status),
909            )))
910            .unwrap();
911
912        // Send execution report (Fill)
913        let fill = FillReport::new(
914            AccountId::from("SIM-001"),
915            InstrumentId::from("EUR/USD.SIM"),
916            VenueOrderId::from("V-001"),
917            TradeId::from("T-001"),
918            OrderSide::Buy,
919            Quantity::from(100_000),
920            Price::from("1.10000"),
921            Money::from("10 USD"),
922            LiquiditySide::Taker,
923            Some(ClientOrderId::from("O-001")),
924            None,
925            UnixNanos::from(1),
926            UnixNanos::from(2),
927            None,
928        );
929        exec_evt_tx
930            .send(ExecutionEvent::Report(ExecutionReport::Fill(Box::new(
931                fill,
932            ))))
933            .unwrap();
934
935        // Send execution report (Position)
936        let position = PositionStatusReport::new(
937            AccountId::from("SIM-001"),
938            InstrumentId::from("EUR/USD.SIM"),
939            PositionSideSpecified::Long,
940            Quantity::from(100_000),
941            UnixNanos::from(1),
942            UnixNanos::from(2),
943            None,
944            Some(PositionId::from("P-001")),
945            None,
946        );
947        exec_evt_tx
948            .send(ExecutionEvent::Report(ExecutionReport::Position(Box::new(
949                position,
950            ))))
951            .unwrap();
952
953        // Send account event
954        let account_state = AccountState::new(
955            AccountId::from("SIM-001"),
956            AccountType::Cash,
957            vec![],
958            vec![],
959            true,
960            UUID4::new(),
961            UnixNanos::from(1),
962            UnixNanos::from(2),
963            None,
964        );
965        exec_evt_tx
966            .send(ExecutionEvent::Account(account_state))
967            .unwrap();
968
969        // Give runner time to process all events
970        tokio::time::sleep(Duration::from_millis(100)).await;
971
972        // Stop runner
973        signal_tx.send(()).unwrap();
974
975        let result = tokio::time::timeout(Duration::from_secs(1), runner_handle).await;
976        assert!(
977            result.is_ok(),
978            "Runner should process all event types and stop cleanly"
979        );
980    }
981
982    #[tokio::test]
983    async fn test_runner_handle_stops_runner() {
984        let (_data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
985        let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
986        let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
987        let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
988        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
989        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
990
991        let mut runner = create_test_runner(
992            time_evt_rx,
993            data_evt_rx,
994            data_cmd_rx,
995            exec_evt_rx,
996            exec_cmd_rx,
997            signal_rx,
998            signal_tx.clone(),
999        );
1000
1001        // Get handle before moving runner
1002        let handle = runner.handle();
1003
1004        let runner_task = tokio::spawn(async move {
1005            runner.run().await;
1006        });
1007
1008        // Use handle to stop
1009        handle.stop();
1010
1011        let result = tokio::time::timeout(Duration::from_millis(100), runner_task).await;
1012        assert!(result.is_ok(), "Runner should stop via handle");
1013    }
1014
1015    #[tokio::test]
1016    async fn test_runner_handle_is_cloneable() {
1017        let (signal_tx, _signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
1018        let handle = AsyncRunnerHandle { signal_tx };
1019
1020        let handle2 = handle.clone();
1021
1022        // Both handles should be able to send stop signals
1023        assert!(handle.signal_tx.send(()).is_ok());
1024        assert!(handle2.signal_tx.send(()).is_ok());
1025    }
1026
1027    #[tokio::test]
1028    async fn test_runner_processes_events_before_stop() {
1029        let (data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
1030        let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
1031        let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
1032        let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
1033        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
1034        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
1035
1036        let mut runner = create_test_runner(
1037            time_evt_rx,
1038            data_evt_rx,
1039            data_cmd_rx,
1040            exec_evt_rx,
1041            exec_cmd_rx,
1042            signal_rx,
1043            signal_tx.clone(),
1044        );
1045
1046        let handle = runner.handle();
1047
1048        // Send events before starting runner
1049        for _ in 0..10 {
1050            let quote = test_quote();
1051            data_evt_tx
1052                .send(DataEvent::Data(Data::Quote(quote)))
1053                .unwrap();
1054        }
1055
1056        let runner_task = tokio::spawn(async move {
1057            runner.run().await;
1058        });
1059
1060        // Give runner time to process queued events
1061        tokio::time::sleep(Duration::from_millis(50)).await;
1062
1063        // Stop runner
1064        handle.stop();
1065
1066        let result = tokio::time::timeout(Duration::from_millis(200), runner_task).await;
1067        assert!(result.is_ok(), "Runner should process events and stop");
1068    }
1069}