nautilus_live/
runner.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{fmt::Debug, sync::Arc};
17
18use nautilus_common::{
19    live::runner::{set_data_event_sender, set_exec_event_sender},
20    messages::{
21        DataEvent, ExecutionEvent, ExecutionReport, data::DataCommand, execution::TradingCommand,
22    },
23    msgbus::{self, switchboard::MessagingSwitchboard},
24    runner::{
25        DataCommandSender, TimeEventSender, TradingCommandSender, set_data_cmd_sender,
26        set_exec_cmd_sender, set_time_event_sender,
27    },
28    timer::TimeEventHandlerV2,
29};
30
31/// Asynchronous implementation of `DataCommandSender` for live environments.
32#[derive(Debug)]
33pub struct AsyncDataCommandSender {
34    cmd_tx: tokio::sync::mpsc::UnboundedSender<DataCommand>,
35}
36
37impl AsyncDataCommandSender {
38    #[must_use]
39    pub const fn new(cmd_tx: tokio::sync::mpsc::UnboundedSender<DataCommand>) -> Self {
40        Self { cmd_tx }
41    }
42}
43
44impl DataCommandSender for AsyncDataCommandSender {
45    fn execute(&self, command: DataCommand) {
46        if let Err(e) = self.cmd_tx.send(command) {
47            log::error!("Failed to send data command: {e}");
48        }
49    }
50}
51
52/// Asynchronous implementation of `TimeEventSender` for live environments.
53#[derive(Debug, Clone)]
54pub struct AsyncTimeEventSender {
55    time_tx: tokio::sync::mpsc::UnboundedSender<TimeEventHandlerV2>,
56}
57
58impl AsyncTimeEventSender {
59    #[must_use]
60    pub const fn new(time_tx: tokio::sync::mpsc::UnboundedSender<TimeEventHandlerV2>) -> Self {
61        Self { time_tx }
62    }
63
64    /// Gets a clone of the underlying channel sender for async use.
65    ///
66    /// This allows async contexts to get a direct channel sender that
67    /// can be moved into async tasks without `RefCell` borrowing issues.
68    #[must_use]
69    pub fn get_channel_sender(&self) -> tokio::sync::mpsc::UnboundedSender<TimeEventHandlerV2> {
70        self.time_tx.clone()
71    }
72}
73
74impl TimeEventSender for AsyncTimeEventSender {
75    fn send(&self, handler: TimeEventHandlerV2) {
76        if let Err(e) = self.time_tx.send(handler) {
77            log::error!("Failed to send time event handler: {e}");
78        }
79    }
80}
81
82/// Asynchronous implementation of `TradingCommandSender` for live environments.
83#[derive(Debug)]
84pub struct AsyncTradingCommandSender {
85    cmd_tx: tokio::sync::mpsc::UnboundedSender<TradingCommand>,
86}
87
88impl AsyncTradingCommandSender {
89    #[must_use]
90    pub const fn new(cmd_tx: tokio::sync::mpsc::UnboundedSender<TradingCommand>) -> Self {
91        Self { cmd_tx }
92    }
93}
94
95impl TradingCommandSender for AsyncTradingCommandSender {
96    fn execute(&self, command: TradingCommand) {
97        if let Err(e) = self.cmd_tx.send(command) {
98            log::error!("Failed to send trading command: {e}");
99        }
100    }
101}
102
103pub trait Runner {
104    fn run(&mut self);
105}
106
107/// Channel receivers for the async event loop.
108///
109/// These can be extracted from `AsyncRunner` via `take_channels()` to drive
110/// the event loop directly on the same thread as the msgbus endpoints.
111#[derive(Debug)]
112pub struct AsyncRunnerChannels {
113    pub time_evt_rx: tokio::sync::mpsc::UnboundedReceiver<TimeEventHandlerV2>,
114    pub data_evt_rx: tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
115    pub data_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
116    pub exec_evt_rx: tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
117    pub exec_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<TradingCommand>,
118}
119
120pub struct AsyncRunner {
121    channels: AsyncRunnerChannels,
122    signal_rx: tokio::sync::mpsc::UnboundedReceiver<()>,
123    signal_tx: tokio::sync::mpsc::UnboundedSender<()>,
124}
125
126/// Handle for stopping the AsyncRunner from another context.
127#[derive(Clone, Debug)]
128pub struct AsyncRunnerHandle {
129    signal_tx: tokio::sync::mpsc::UnboundedSender<()>,
130}
131
132impl AsyncRunnerHandle {
133    /// Signals the runner to stop.
134    pub fn stop(&self) {
135        if let Err(e) = self.signal_tx.send(()) {
136            log::error!("Failed to send shutdown signal: {e}");
137        }
138    }
139}
140
141impl Default for AsyncRunner {
142    fn default() -> Self {
143        Self::new()
144    }
145}
146
147impl Debug for AsyncRunner {
148    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149        f.debug_struct(stringify!(AsyncRunner)).finish()
150    }
151}
152
153impl AsyncRunner {
154    /// Creates a new [`AsyncRunner`] instance.
155    #[must_use]
156    pub fn new() -> Self {
157        use tokio::sync::mpsc::unbounded_channel; // tokio-import-ok
158
159        let (time_evt_tx, time_evt_rx) = unbounded_channel::<TimeEventHandlerV2>();
160        let (data_cmd_tx, data_cmd_rx) = unbounded_channel::<DataCommand>();
161        let (data_evt_tx, data_evt_rx) = unbounded_channel::<DataEvent>();
162        let (exec_cmd_tx, exec_cmd_rx) = unbounded_channel::<TradingCommand>();
163        let (exec_evt_tx, exec_evt_rx) = unbounded_channel::<ExecutionEvent>();
164        let (signal_tx, signal_rx) = unbounded_channel::<()>();
165
166        set_time_event_sender(Arc::new(AsyncTimeEventSender::new(time_evt_tx)));
167        set_data_cmd_sender(Arc::new(AsyncDataCommandSender::new(data_cmd_tx)));
168        set_data_event_sender(data_evt_tx);
169        set_exec_cmd_sender(Arc::new(AsyncTradingCommandSender::new(exec_cmd_tx)));
170        set_exec_event_sender(exec_evt_tx);
171
172        Self {
173            channels: AsyncRunnerChannels {
174                time_evt_rx,
175                data_evt_rx,
176                data_cmd_rx,
177                exec_evt_rx,
178                exec_cmd_rx,
179            },
180            signal_rx,
181            signal_tx,
182        }
183    }
184
185    /// Stops the runner with an internal shutdown signal.
186    pub fn stop(&self) {
187        if let Err(e) = self.signal_tx.send(()) {
188            log::error!("Failed to send shutdown signal: {e}");
189        }
190    }
191
192    /// Returns a handle that can be used to stop the runner from another context.
193    #[must_use]
194    pub fn handle(&self) -> AsyncRunnerHandle {
195        AsyncRunnerHandle {
196            signal_tx: self.signal_tx.clone(),
197        }
198    }
199
200    /// Consumes the runner and returns the channel receivers for direct event loop driving.
201    ///
202    /// This is used when the event loop needs to run on the same thread as the msgbus
203    /// endpoints (which use thread-local storage).
204    #[must_use]
205    pub fn take_channels(self) -> AsyncRunnerChannels {
206        self.channels
207    }
208
209    /// Runs the async runner event loop.
210    ///
211    /// This method processes data events, time events, execution events, and signal events in an async loop.
212    /// It will run until a signal is received or the event streams are closed.
213    pub async fn run(&mut self) {
214        log::info!("AsyncRunner starting");
215
216        loop {
217            tokio::select! {
218                Some(()) = self.signal_rx.recv() => {
219                    tracing::info!("AsyncRunner received signal, shutting down");
220                    return;
221                },
222                Some(handler) = self.channels.time_evt_rx.recv() => {
223                    Self::handle_time_event(handler);
224                },
225                Some(cmd) = self.channels.data_cmd_rx.recv() => {
226                    Self::handle_data_command(cmd);
227                },
228                Some(evt) = self.channels.data_evt_rx.recv() => {
229                    Self::handle_data_event(evt);
230                },
231                Some(cmd) = self.channels.exec_cmd_rx.recv() => {
232                    Self::handle_exec_command(cmd);
233                },
234                Some(evt) = self.channels.exec_evt_rx.recv() => {
235                    Self::handle_exec_event(evt);
236                },
237                else => {
238                    tracing::debug!("AsyncRunner all channels closed, exiting");
239                    return;
240                }
241            };
242        }
243    }
244
245    /// Handles a time event by running its callback.
246    #[inline]
247    pub fn handle_time_event(handler: TimeEventHandlerV2) {
248        handler.run();
249    }
250
251    /// Handles a data command by sending to the DataEngine.
252    #[inline]
253    pub fn handle_data_command(cmd: DataCommand) {
254        msgbus::send_any(MessagingSwitchboard::data_engine_execute(), &cmd);
255    }
256
257    /// Handles a data event by sending to the appropriate DataEngine endpoint.
258    #[inline]
259    pub fn handle_data_event(event: DataEvent) {
260        match event {
261            DataEvent::Data(data) => {
262                msgbus::send_any(MessagingSwitchboard::data_engine_process(), &data);
263            }
264            DataEvent::Instrument(data) => {
265                msgbus::send_any(MessagingSwitchboard::data_engine_process(), &data);
266            }
267            DataEvent::Response(resp) => {
268                msgbus::send_any(MessagingSwitchboard::data_engine_response(), &resp);
269            }
270            #[cfg(feature = "defi")]
271            DataEvent::DeFi(data) => {
272                msgbus::send_any(MessagingSwitchboard::data_engine_process(), &data);
273            }
274        }
275    }
276
277    /// Handles an execution command by sending to the ExecEngine.
278    #[inline]
279    pub fn handle_exec_command(cmd: TradingCommand) {
280        msgbus::send_any(MessagingSwitchboard::exec_engine_execute(), &cmd);
281    }
282
283    /// Handles an execution event by sending to the appropriate engine endpoint.
284    #[inline]
285    pub fn handle_exec_event(event: ExecutionEvent) {
286        match event {
287            ExecutionEvent::Order(ref order_event) => {
288                msgbus::send_any(MessagingSwitchboard::exec_engine_process(), order_event);
289            }
290            ExecutionEvent::Report(report) => {
291                Self::handle_exec_report(report);
292            }
293            ExecutionEvent::Account(ref account) => {
294                msgbus::send_any(MessagingSwitchboard::portfolio_update_account(), account);
295            }
296        }
297    }
298
299    #[inline]
300    pub fn handle_exec_report(report: ExecutionReport) {
301        match report {
302            ExecutionReport::OrderStatus(r) => {
303                msgbus::send_any(
304                    MessagingSwitchboard::exec_engine_reconcile_execution_report(),
305                    &*r,
306                );
307            }
308            ExecutionReport::Fill(r) => {
309                msgbus::send_any(
310                    MessagingSwitchboard::exec_engine_reconcile_execution_report(),
311                    &*r,
312                );
313            }
314            ExecutionReport::Position(r) => {
315                msgbus::send_any(
316                    MessagingSwitchboard::exec_engine_reconcile_execution_report(),
317                    &*r,
318                );
319            }
320            ExecutionReport::Mass(r) => {
321                msgbus::send_any(
322                    MessagingSwitchboard::exec_engine_reconcile_execution_mass_status(),
323                    &*r,
324                );
325            }
326        }
327    }
328}
329
330#[cfg(test)]
331mod tests {
332    use std::time::Duration;
333
334    use nautilus_common::{
335        messages::{
336            ExecutionEvent, ExecutionReport,
337            data::{SubscribeCommand, SubscribeCustomData},
338            execution::TradingCommand,
339        },
340        timer::{TimeEvent, TimeEventCallback, TimeEventHandlerV2},
341    };
342    use nautilus_core::{UUID4, UnixNanos};
343    use nautilus_model::{
344        data::{Data, DataType, quote::QuoteTick},
345        enums::{
346            AccountType, LiquiditySide, OrderSide, OrderStatus, OrderType, PositionSideSpecified,
347            TimeInForce,
348        },
349        events::{OrderEvent, OrderEventAny, OrderSubmitted, account::state::AccountState},
350        identifiers::{
351            AccountId, ClientId, ClientOrderId, InstrumentId, PositionId, StrategyId, TradeId,
352            TraderId, VenueOrderId,
353        },
354        reports::{FillReport, OrderStatusReport, PositionStatusReport},
355        types::{Money, Price, Quantity},
356    };
357    use rstest::rstest;
358    use ustr::Ustr;
359
360    use super::*;
361
362    // Test fixture for creating test quotes
363    fn test_quote() -> QuoteTick {
364        QuoteTick {
365            instrument_id: InstrumentId::from("EUR/USD.SIM"),
366            bid_price: Price::from("1.10000"),
367            ask_price: Price::from("1.10001"),
368            bid_size: Quantity::from(1_000_000),
369            ask_size: Quantity::from(1_000_000),
370            ts_event: UnixNanos::default(),
371            ts_init: UnixNanos::default(),
372        }
373    }
374
375    // Test helper to create AsyncRunner with manual channels
376    fn create_test_runner(
377        time_evt_rx: tokio::sync::mpsc::UnboundedReceiver<TimeEventHandlerV2>,
378        data_evt_rx: tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
379        data_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
380        exec_evt_rx: tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
381        exec_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<TradingCommand>,
382        signal_rx: tokio::sync::mpsc::UnboundedReceiver<()>,
383        signal_tx: tokio::sync::mpsc::UnboundedSender<()>,
384    ) -> AsyncRunner {
385        AsyncRunner {
386            channels: AsyncRunnerChannels {
387                time_evt_rx,
388                data_evt_rx,
389                data_cmd_rx,
390                exec_evt_rx,
391                exec_cmd_rx,
392            },
393            signal_rx,
394            signal_tx,
395        }
396    }
397
398    #[rstest]
399    fn test_async_data_command_sender_creation() {
400        let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
401        let sender = AsyncDataCommandSender::new(tx);
402        assert!(format!("{sender:?}").contains("AsyncDataCommandSender"));
403    }
404
405    #[rstest]
406    fn test_async_time_event_sender_creation() {
407        let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
408        let sender = AsyncTimeEventSender::new(tx);
409        assert!(format!("{sender:?}").contains("AsyncTimeEventSender"));
410    }
411
412    #[rstest]
413    fn test_async_time_event_sender_get_channel() {
414        let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
415        let sender = AsyncTimeEventSender::new(tx);
416        let channel = sender.get_channel_sender();
417
418        // Verify the channel is functional
419        let event = TimeEvent::new(
420            Ustr::from("test"),
421            UUID4::new(),
422            UnixNanos::from(1),
423            UnixNanos::from(2),
424        );
425        let callback = TimeEventCallback::from(|_: TimeEvent| {});
426        let handler = TimeEventHandlerV2::new(event, callback);
427
428        assert!(channel.send(handler).is_ok());
429    }
430
431    #[tokio::test]
432    async fn test_async_data_command_sender_execute() {
433        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
434        let sender = AsyncDataCommandSender::new(tx);
435
436        let command = DataCommand::Subscribe(SubscribeCommand::Data(SubscribeCustomData {
437            client_id: Some(ClientId::from("TEST")),
438            venue: None,
439            data_type: DataType::new("QuoteTick", None),
440            command_id: UUID4::new(),
441            ts_init: UnixNanos::default(),
442            params: None,
443        }));
444
445        sender.execute(command.clone());
446
447        let received = rx.recv().await.unwrap();
448        match (received, command) {
449            (
450                DataCommand::Subscribe(SubscribeCommand::Data(r)),
451                DataCommand::Subscribe(SubscribeCommand::Data(c)),
452            ) => {
453                assert_eq!(r.client_id, c.client_id);
454                assert_eq!(r.data_type, c.data_type);
455            }
456            _ => panic!("Command mismatch"),
457        }
458    }
459
460    #[tokio::test]
461    async fn test_async_time_event_sender_send() {
462        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
463        let sender = AsyncTimeEventSender::new(tx);
464
465        let event = TimeEvent::new(
466            Ustr::from("test"),
467            UUID4::new(),
468            UnixNanos::from(1),
469            UnixNanos::from(2),
470        );
471        let callback = TimeEventCallback::from(|_: TimeEvent| {});
472        let handler = TimeEventHandlerV2::new(event, callback);
473
474        sender.send(handler);
475
476        assert!(rx.recv().await.is_some());
477    }
478
479    #[tokio::test]
480    async fn test_runner_shutdown_signal() {
481        // Create runner with manual channels to avoid global state
482        let (_data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
483        let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
484        let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
485        let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
486        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
487        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
488
489        let mut runner = create_test_runner(
490            time_evt_rx,
491            data_evt_rx,
492            data_cmd_rx,
493            exec_evt_rx,
494            exec_cmd_rx,
495            signal_rx,
496            signal_tx.clone(),
497        );
498
499        // Start runner
500        let runner_handle = tokio::spawn(async move {
501            runner.run().await;
502        });
503
504        // Send shutdown signal
505        signal_tx.send(()).unwrap();
506
507        // Runner should stop quickly
508        let result = tokio::time::timeout(Duration::from_millis(100), runner_handle).await;
509        assert!(result.is_ok(), "Runner should stop on signal");
510    }
511
512    #[tokio::test]
513    async fn test_runner_closes_on_channel_drop() {
514        let (data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
515        let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
516        let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
517        let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
518        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
519        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
520
521        let mut runner = create_test_runner(
522            time_evt_rx,
523            data_evt_rx,
524            data_cmd_rx,
525            exec_evt_rx,
526            exec_cmd_rx,
527            signal_rx,
528            signal_tx.clone(),
529        );
530
531        // Start runner
532        let runner_handle = tokio::spawn(async move {
533            runner.run().await;
534        });
535
536        // Drop data sender to close channel - this should cause runner to exit
537        drop(data_tx);
538
539        // Send stop signal to ensure clean shutdown
540        tokio::time::sleep(Duration::from_millis(50)).await;
541        signal_tx.send(()).ok();
542
543        // Runner should stop when channels close or on signal
544        let result = tokio::time::timeout(Duration::from_millis(200), runner_handle).await;
545        assert!(
546            result.is_ok(),
547            "Runner should stop when channels close or on signal"
548        );
549    }
550
551    #[tokio::test]
552    async fn test_concurrent_event_sending() {
553        let (data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
554        let (_data_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
555        let (_time_evt_tx, time_evt_rx) =
556            tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
557        let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
558        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
559        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
560
561        // Setup runner
562        let mut runner = create_test_runner(
563            time_evt_rx,
564            data_evt_rx,
565            data_cmd_rx,
566            exec_evt_rx,
567            exec_cmd_rx,
568            signal_rx,
569            signal_tx.clone(),
570        );
571
572        // Spawn multiple concurrent senders
573        let mut handles = vec![];
574        for _ in 0..5 {
575            let tx_clone = data_evt_tx.clone();
576            let handle = tokio::spawn(async move {
577                for _ in 0..20 {
578                    let quote = test_quote();
579                    tx_clone.send(DataEvent::Data(Data::Quote(quote))).unwrap();
580                    tokio::task::yield_now().await;
581                }
582            });
583            handles.push(handle);
584        }
585
586        // Start runner in background
587        let runner_handle = tokio::spawn(async move {
588            runner.run().await;
589        });
590
591        // Wait for all senders
592        for handle in handles {
593            handle.await.unwrap();
594        }
595
596        // Give runner time to process
597        tokio::time::sleep(Duration::from_millis(50)).await;
598
599        // Stop runner
600        signal_tx.send(()).unwrap();
601
602        let _ = tokio::time::timeout(Duration::from_secs(1), runner_handle).await;
603    }
604
605    #[rstest]
606    #[case(10)]
607    #[case(100)]
608    #[case(1000)]
609    fn test_channel_send_performance(#[case] count: usize) {
610        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
611        let quote = test_quote();
612
613        // Send events
614        for _ in 0..count {
615            tx.send(DataEvent::Data(Data::Quote(quote))).unwrap();
616        }
617
618        // Verify all received
619        let mut received = 0;
620        while rx.try_recv().is_ok() {
621            received += 1;
622        }
623
624        assert_eq!(received, count);
625    }
626
627    #[rstest]
628    fn test_async_trading_command_sender_creation() {
629        let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
630        let sender = AsyncTradingCommandSender::new(tx);
631        assert!(format!("{sender:?}").contains("AsyncTradingCommandSender"));
632    }
633
634    #[tokio::test]
635    async fn test_execution_event_order_channel() {
636        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
637
638        let event = OrderSubmitted::new(
639            TraderId::from("TRADER-001"),
640            StrategyId::from("S-001"),
641            InstrumentId::from("EUR/USD.SIM"),
642            ClientOrderId::from("O-001"),
643            AccountId::from("SIM-001"),
644            UUID4::new(),
645            UnixNanos::from(1),
646            UnixNanos::from(2),
647        );
648
649        tx.send(ExecutionEvent::Order(OrderEventAny::Submitted(event)))
650            .unwrap();
651
652        let received = rx.recv().await.unwrap();
653        match received {
654            ExecutionEvent::Order(OrderEventAny::Submitted(e)) => {
655                assert_eq!(e.client_order_id(), ClientOrderId::from("O-001"));
656            }
657            _ => panic!("Expected OrderSubmitted event"),
658        }
659    }
660
661    #[tokio::test]
662    async fn test_execution_report_order_status_channel() {
663        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
664
665        let report = OrderStatusReport::new(
666            AccountId::from("SIM-001"),
667            InstrumentId::from("EUR/USD.SIM"),
668            Some(ClientOrderId::from("O-001")),
669            VenueOrderId::from("V-001"),
670            OrderSide::Buy,
671            OrderType::Market,
672            TimeInForce::Gtc,
673            OrderStatus::Accepted,
674            Quantity::from(100_000),
675            Quantity::from(100_000),
676            UnixNanos::from(1),
677            UnixNanos::from(2),
678            UnixNanos::from(3),
679            None,
680        );
681
682        tx.send(ExecutionEvent::Report(ExecutionReport::OrderStatus(
683            Box::new(report),
684        )))
685        .unwrap();
686
687        let received = rx.recv().await.unwrap();
688        match received {
689            ExecutionEvent::Report(ExecutionReport::OrderStatus(r)) => {
690                assert_eq!(r.venue_order_id.as_str(), "V-001");
691                assert_eq!(r.order_status, OrderStatus::Accepted);
692            }
693            _ => panic!("Expected OrderStatusReport"),
694        }
695    }
696
697    #[tokio::test]
698    async fn test_execution_report_fill() {
699        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
700
701        let report = FillReport::new(
702            AccountId::from("SIM-001"),
703            InstrumentId::from("EUR/USD.SIM"),
704            VenueOrderId::from("V-001"),
705            TradeId::from("T-001"),
706            OrderSide::Buy,
707            Quantity::from(100_000),
708            Price::from("1.10000"),
709            Money::from("10 USD"),
710            LiquiditySide::Taker,
711            Some(ClientOrderId::from("O-001")),
712            None,
713            UnixNanos::from(1),
714            UnixNanos::from(2),
715            None,
716        );
717
718        tx.send(ExecutionEvent::Report(ExecutionReport::Fill(Box::new(
719            report,
720        ))))
721        .unwrap();
722
723        let received = rx.recv().await.unwrap();
724        match received {
725            ExecutionEvent::Report(ExecutionReport::Fill(r)) => {
726                assert_eq!(r.venue_order_id.as_str(), "V-001");
727                assert_eq!(r.trade_id.to_string(), "T-001");
728            }
729            _ => panic!("Expected FillReport"),
730        }
731    }
732
733    #[tokio::test]
734    async fn test_execution_report_position() {
735        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
736
737        let report = PositionStatusReport::new(
738            AccountId::from("SIM-001"),
739            InstrumentId::from("EUR/USD.SIM"),
740            PositionSideSpecified::Long,
741            Quantity::from(100_000),
742            UnixNanos::from(1),
743            UnixNanos::from(2),
744            None,
745            Some(PositionId::from("P-001")),
746            None,
747        );
748
749        tx.send(ExecutionEvent::Report(ExecutionReport::Position(Box::new(
750            report,
751        ))))
752        .unwrap();
753
754        let received = rx.recv().await.unwrap();
755        match received {
756            ExecutionEvent::Report(ExecutionReport::Position(r)) => {
757                assert_eq!(r.venue_position_id.unwrap().as_str(), "P-001");
758            }
759            _ => panic!("Expected PositionStatusReport"),
760        }
761    }
762
763    #[tokio::test]
764    async fn test_execution_event_account() {
765        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
766
767        let account_state = AccountState::new(
768            AccountId::from("SIM-001"),
769            AccountType::Cash,
770            vec![],
771            vec![],
772            true,
773            UUID4::new(),
774            UnixNanos::from(1),
775            UnixNanos::from(2),
776            None,
777        );
778
779        tx.send(ExecutionEvent::Account(account_state)).unwrap();
780
781        let received = rx.recv().await.unwrap();
782        match received {
783            ExecutionEvent::Account(r) => {
784                assert_eq!(r.account_id.as_str(), "SIM-001");
785            }
786            _ => panic!("Expected AccountState"),
787        }
788    }
789
790    #[tokio::test]
791    async fn test_runner_stop_method() {
792        let (_data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
793        let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
794        let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
795        let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
796        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
797        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
798
799        let mut runner = create_test_runner(
800            time_evt_rx,
801            data_evt_rx,
802            data_cmd_rx,
803            exec_evt_rx,
804            exec_cmd_rx,
805            signal_rx,
806            signal_tx.clone(),
807        );
808
809        let runner_handle = tokio::spawn(async move {
810            runner.run().await;
811        });
812
813        // Use stop via signal_tx directly
814        signal_tx.send(()).unwrap();
815
816        let result = tokio::time::timeout(Duration::from_millis(100), runner_handle).await;
817        assert!(result.is_ok(), "Runner should stop when stop() is called");
818    }
819
820    #[tokio::test]
821    async fn test_all_event_types_integration() {
822        let (data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
823        let (data_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
824        let (time_evt_tx, time_evt_rx) =
825            tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
826        let (exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
827        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
828        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
829
830        let mut runner = create_test_runner(
831            time_evt_rx,
832            data_evt_rx,
833            data_cmd_rx,
834            exec_evt_rx,
835            exec_cmd_rx,
836            signal_rx,
837            signal_tx.clone(),
838        );
839
840        let runner_handle = tokio::spawn(async move {
841            runner.run().await;
842        });
843
844        // Send data event
845        let quote = test_quote();
846        data_evt_tx
847            .send(DataEvent::Data(Data::Quote(quote)))
848            .unwrap();
849
850        // Send data command
851        let command = DataCommand::Subscribe(SubscribeCommand::Data(SubscribeCustomData {
852            client_id: Some(ClientId::from("TEST")),
853            venue: None,
854            data_type: DataType::new("QuoteTick", None),
855            command_id: UUID4::new(),
856            ts_init: UnixNanos::default(),
857            params: None,
858        }));
859        data_cmd_tx.send(command).unwrap();
860
861        // Send time event
862        let event = TimeEvent::new(
863            Ustr::from("test"),
864            UUID4::new(),
865            UnixNanos::from(1),
866            UnixNanos::from(2),
867        );
868        let callback = TimeEventCallback::from(|_: TimeEvent| {});
869        let handler = TimeEventHandlerV2::new(event, callback);
870        time_evt_tx.send(handler).unwrap();
871
872        // Send execution order event
873        let order_event = OrderSubmitted::new(
874            TraderId::from("TRADER-001"),
875            StrategyId::from("S-001"),
876            InstrumentId::from("EUR/USD.SIM"),
877            ClientOrderId::from("O-001"),
878            AccountId::from("SIM-001"),
879            UUID4::new(),
880            UnixNanos::from(1),
881            UnixNanos::from(2),
882        );
883        exec_evt_tx
884            .send(ExecutionEvent::Order(OrderEventAny::Submitted(order_event)))
885            .unwrap();
886
887        // Send execution report (OrderStatus)
888        let order_status = OrderStatusReport::new(
889            AccountId::from("SIM-001"),
890            InstrumentId::from("EUR/USD.SIM"),
891            Some(ClientOrderId::from("O-001")),
892            VenueOrderId::from("V-001"),
893            OrderSide::Buy,
894            OrderType::Market,
895            TimeInForce::Gtc,
896            OrderStatus::Accepted,
897            Quantity::from(100_000),
898            Quantity::from(100_000),
899            UnixNanos::from(1),
900            UnixNanos::from(2),
901            UnixNanos::from(3),
902            None,
903        );
904        exec_evt_tx
905            .send(ExecutionEvent::Report(ExecutionReport::OrderStatus(
906                Box::new(order_status),
907            )))
908            .unwrap();
909
910        // Send execution report (Fill)
911        let fill = FillReport::new(
912            AccountId::from("SIM-001"),
913            InstrumentId::from("EUR/USD.SIM"),
914            VenueOrderId::from("V-001"),
915            TradeId::from("T-001"),
916            OrderSide::Buy,
917            Quantity::from(100_000),
918            Price::from("1.10000"),
919            Money::from("10 USD"),
920            LiquiditySide::Taker,
921            Some(ClientOrderId::from("O-001")),
922            None,
923            UnixNanos::from(1),
924            UnixNanos::from(2),
925            None,
926        );
927        exec_evt_tx
928            .send(ExecutionEvent::Report(ExecutionReport::Fill(Box::new(
929                fill,
930            ))))
931            .unwrap();
932
933        // Send execution report (Position)
934        let position = PositionStatusReport::new(
935            AccountId::from("SIM-001"),
936            InstrumentId::from("EUR/USD.SIM"),
937            PositionSideSpecified::Long,
938            Quantity::from(100_000),
939            UnixNanos::from(1),
940            UnixNanos::from(2),
941            None,
942            Some(PositionId::from("P-001")),
943            None,
944        );
945        exec_evt_tx
946            .send(ExecutionEvent::Report(ExecutionReport::Position(Box::new(
947                position,
948            ))))
949            .unwrap();
950
951        // Send account event
952        let account_state = AccountState::new(
953            AccountId::from("SIM-001"),
954            AccountType::Cash,
955            vec![],
956            vec![],
957            true,
958            UUID4::new(),
959            UnixNanos::from(1),
960            UnixNanos::from(2),
961            None,
962        );
963        exec_evt_tx
964            .send(ExecutionEvent::Account(account_state))
965            .unwrap();
966
967        // Give runner time to process all events
968        tokio::time::sleep(Duration::from_millis(100)).await;
969
970        // Stop runner
971        signal_tx.send(()).unwrap();
972
973        let result = tokio::time::timeout(Duration::from_secs(1), runner_handle).await;
974        assert!(
975            result.is_ok(),
976            "Runner should process all event types and stop cleanly"
977        );
978    }
979
980    #[tokio::test]
981    async fn test_runner_handle_stops_runner() {
982        let (_data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
983        let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
984        let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
985        let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
986        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
987        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
988
989        let mut runner = create_test_runner(
990            time_evt_rx,
991            data_evt_rx,
992            data_cmd_rx,
993            exec_evt_rx,
994            exec_cmd_rx,
995            signal_rx,
996            signal_tx.clone(),
997        );
998
999        // Get handle before moving runner
1000        let handle = runner.handle();
1001
1002        let runner_task = tokio::spawn(async move {
1003            runner.run().await;
1004        });
1005
1006        // Use handle to stop
1007        handle.stop();
1008
1009        let result = tokio::time::timeout(Duration::from_millis(100), runner_task).await;
1010        assert!(result.is_ok(), "Runner should stop via handle");
1011    }
1012
1013    #[tokio::test]
1014    async fn test_runner_handle_is_cloneable() {
1015        let (signal_tx, _signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
1016        let handle = AsyncRunnerHandle { signal_tx };
1017
1018        let handle2 = handle.clone();
1019
1020        // Both handles should be able to send stop signals
1021        assert!(handle.signal_tx.send(()).is_ok());
1022        assert!(handle2.signal_tx.send(()).is_ok());
1023    }
1024
1025    #[tokio::test]
1026    async fn test_runner_processes_events_before_stop() {
1027        let (data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
1028        let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
1029        let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
1030        let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
1031        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
1032        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
1033
1034        let mut runner = create_test_runner(
1035            time_evt_rx,
1036            data_evt_rx,
1037            data_cmd_rx,
1038            exec_evt_rx,
1039            exec_cmd_rx,
1040            signal_rx,
1041            signal_tx.clone(),
1042        );
1043
1044        let handle = runner.handle();
1045
1046        // Send events before starting runner
1047        for _ in 0..10 {
1048            let quote = test_quote();
1049            data_evt_tx
1050                .send(DataEvent::Data(Data::Quote(quote)))
1051                .unwrap();
1052        }
1053
1054        let runner_task = tokio::spawn(async move {
1055            runner.run().await;
1056        });
1057
1058        // Give runner time to process queued events
1059        tokio::time::sleep(Duration::from_millis(50)).await;
1060
1061        // Stop runner
1062        handle.stop();
1063
1064        let result = tokio::time::timeout(Duration::from_millis(200), runner_task).await;
1065        assert!(result.is_ok(), "Runner should process events and stop");
1066    }
1067}