Skip to main content

nautilus_live/
runner.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{fmt::Debug, sync::Arc};
17
18use nautilus_common::{
19    live::runner::{set_data_event_sender, set_exec_event_sender},
20    messages::{
21        DataEvent, ExecutionEvent, ExecutionReport, data::DataCommand, execution::TradingCommand,
22    },
23    msgbus::{self, MessagingSwitchboard},
24    runner::{
25        DataCommandSender, TimeEventSender, TradingCommandSender, set_data_cmd_sender,
26        set_exec_cmd_sender, set_time_event_sender,
27    },
28    timer::TimeEventHandler,
29};
30
31/// Asynchronous implementation of `DataCommandSender` for live environments.
32#[derive(Debug)]
33pub struct AsyncDataCommandSender {
34    cmd_tx: tokio::sync::mpsc::UnboundedSender<DataCommand>,
35}
36
37impl AsyncDataCommandSender {
38    #[must_use]
39    pub const fn new(cmd_tx: tokio::sync::mpsc::UnboundedSender<DataCommand>) -> Self {
40        Self { cmd_tx }
41    }
42}
43
44impl DataCommandSender for AsyncDataCommandSender {
45    fn execute(&self, command: DataCommand) {
46        if let Err(e) = self.cmd_tx.send(command) {
47            log::error!("Failed to send data command: {e}");
48        }
49    }
50}
51
52/// Asynchronous implementation of `TimeEventSender` for live environments.
53#[derive(Debug, Clone)]
54pub struct AsyncTimeEventSender {
55    time_tx: tokio::sync::mpsc::UnboundedSender<TimeEventHandler>,
56}
57
58impl AsyncTimeEventSender {
59    #[must_use]
60    pub const fn new(time_tx: tokio::sync::mpsc::UnboundedSender<TimeEventHandler>) -> Self {
61        Self { time_tx }
62    }
63
64    /// Gets a clone of the underlying channel sender for async use.
65    ///
66    /// This allows async contexts to get a direct channel sender that
67    /// can be moved into async tasks without `RefCell` borrowing issues.
68    #[must_use]
69    pub fn get_channel_sender(&self) -> tokio::sync::mpsc::UnboundedSender<TimeEventHandler> {
70        self.time_tx.clone()
71    }
72}
73
74impl TimeEventSender for AsyncTimeEventSender {
75    fn send(&self, handler: TimeEventHandler) {
76        if let Err(e) = self.time_tx.send(handler) {
77            log::error!("Failed to send time event handler: {e}");
78        }
79    }
80}
81
82/// Asynchronous implementation of `TradingCommandSender` for live environments.
83#[derive(Debug)]
84pub struct AsyncTradingCommandSender {
85    cmd_tx: tokio::sync::mpsc::UnboundedSender<TradingCommand>,
86}
87
88impl AsyncTradingCommandSender {
89    #[must_use]
90    pub const fn new(cmd_tx: tokio::sync::mpsc::UnboundedSender<TradingCommand>) -> Self {
91        Self { cmd_tx }
92    }
93}
94
95impl TradingCommandSender for AsyncTradingCommandSender {
96    fn execute(&self, command: TradingCommand) {
97        if let Err(e) = self.cmd_tx.send(command) {
98            log::error!("Failed to send trading command: {e}");
99        }
100    }
101}
102
103pub trait Runner {
104    fn run(&mut self);
105}
106
107/// Channel receivers for the async event loop.
108///
109/// These can be extracted from `AsyncRunner` via `take_channels()` to drive
110/// the event loop directly on the same thread as the msgbus endpoints.
111#[derive(Debug)]
112pub struct AsyncRunnerChannels {
113    pub time_evt_rx: tokio::sync::mpsc::UnboundedReceiver<TimeEventHandler>,
114    pub data_evt_rx: tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
115    pub data_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
116    pub exec_evt_rx: tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
117    pub exec_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<TradingCommand>,
118}
119
120pub struct AsyncRunner {
121    channels: AsyncRunnerChannels,
122    signal_rx: tokio::sync::mpsc::UnboundedReceiver<()>,
123    signal_tx: tokio::sync::mpsc::UnboundedSender<()>,
124}
125
126/// Handle for stopping the AsyncRunner from another context.
127#[derive(Clone, Debug)]
128pub struct AsyncRunnerHandle {
129    signal_tx: tokio::sync::mpsc::UnboundedSender<()>,
130}
131
132impl AsyncRunnerHandle {
133    /// Signals the runner to stop.
134    pub fn stop(&self) {
135        if let Err(e) = self.signal_tx.send(()) {
136            log::error!("Failed to send shutdown signal: {e}");
137        }
138    }
139}
140
141impl Default for AsyncRunner {
142    fn default() -> Self {
143        Self::new()
144    }
145}
146
147impl Debug for AsyncRunner {
148    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149        f.debug_struct(stringify!(AsyncRunner)).finish()
150    }
151}
152
153impl AsyncRunner {
154    /// Creates a new [`AsyncRunner`] instance.
155    #[must_use]
156    pub fn new() -> Self {
157        use tokio::sync::mpsc::unbounded_channel; // tokio-import-ok
158
159        let (time_evt_tx, time_evt_rx) = unbounded_channel::<TimeEventHandler>();
160        let (data_cmd_tx, data_cmd_rx) = unbounded_channel::<DataCommand>();
161        let (data_evt_tx, data_evt_rx) = unbounded_channel::<DataEvent>();
162        let (exec_cmd_tx, exec_cmd_rx) = unbounded_channel::<TradingCommand>();
163        let (exec_evt_tx, exec_evt_rx) = unbounded_channel::<ExecutionEvent>();
164        let (signal_tx, signal_rx) = unbounded_channel::<()>();
165
166        set_time_event_sender(Arc::new(AsyncTimeEventSender::new(time_evt_tx)));
167        set_data_cmd_sender(Arc::new(AsyncDataCommandSender::new(data_cmd_tx)));
168        set_data_event_sender(data_evt_tx);
169        set_exec_cmd_sender(Arc::new(AsyncTradingCommandSender::new(exec_cmd_tx)));
170        set_exec_event_sender(exec_evt_tx);
171
172        Self {
173            channels: AsyncRunnerChannels {
174                time_evt_rx,
175                data_evt_rx,
176                data_cmd_rx,
177                exec_evt_rx,
178                exec_cmd_rx,
179            },
180            signal_rx,
181            signal_tx,
182        }
183    }
184
185    /// Stops the runner with an internal shutdown signal.
186    pub fn stop(&self) {
187        if let Err(e) = self.signal_tx.send(()) {
188            log::error!("Failed to send shutdown signal: {e}");
189        }
190    }
191
192    /// Returns a handle that can be used to stop the runner from another context.
193    #[must_use]
194    pub fn handle(&self) -> AsyncRunnerHandle {
195        AsyncRunnerHandle {
196            signal_tx: self.signal_tx.clone(),
197        }
198    }
199
200    /// Consumes the runner and returns the channel receivers for direct event loop driving.
201    ///
202    /// This is used when the event loop needs to run on the same thread as the msgbus
203    /// endpoints (which use thread-local storage).
204    #[must_use]
205    pub fn take_channels(self) -> AsyncRunnerChannels {
206        self.channels
207    }
208
209    /// Drains all pending data events from the channel and processes them.
210    pub fn drain_pending_data_events(&mut self) {
211        let mut count = 0;
212        while let Ok(evt) = self.channels.data_evt_rx.try_recv() {
213            Self::handle_data_event(evt);
214            count += 1;
215        }
216        if count > 0 {
217            log::debug!("Drained {count} pending data events");
218        }
219    }
220
221    /// Runs the async runner event loop.
222    ///
223    /// This method processes data events, time events, execution events, and signal events in an async loop.
224    /// It will run until a signal is received or the event streams are closed.
225    pub async fn run(&mut self) {
226        log::info!("AsyncRunner starting");
227
228        loop {
229            tokio::select! {
230                Some(()) = self.signal_rx.recv() => {
231                    log::info!("AsyncRunner received signal, shutting down");
232                    return;
233                },
234                Some(handler) = self.channels.time_evt_rx.recv() => {
235                    Self::handle_time_event(handler);
236                },
237                Some(cmd) = self.channels.data_cmd_rx.recv() => {
238                    Self::handle_data_command(cmd);
239                },
240                Some(evt) = self.channels.data_evt_rx.recv() => {
241                    Self::handle_data_event(evt);
242                },
243                Some(cmd) = self.channels.exec_cmd_rx.recv() => {
244                    Self::handle_exec_command(cmd);
245                },
246                Some(evt) = self.channels.exec_evt_rx.recv() => {
247                    Self::handle_exec_event(evt);
248                },
249                else => {
250                    log::debug!("AsyncRunner all channels closed, exiting");
251                    return;
252                }
253            };
254        }
255    }
256
257    /// Handles a time event by running its callback.
258    #[inline]
259    pub fn handle_time_event(handler: TimeEventHandler) {
260        handler.run();
261    }
262
263    /// Handles a data command by sending to the DataEngine.
264    #[inline]
265    pub fn handle_data_command(cmd: DataCommand) {
266        msgbus::send_data_command(MessagingSwitchboard::data_engine_execute(), cmd);
267    }
268
269    /// Handles a data event by sending to the appropriate DataEngine endpoint.
270    #[inline]
271    pub fn handle_data_event(event: DataEvent) {
272        match event {
273            DataEvent::Data(data) => {
274                msgbus::send_data(MessagingSwitchboard::data_engine_process_data(), data);
275            }
276            DataEvent::Instrument(data) => {
277                msgbus::send_any(MessagingSwitchboard::data_engine_process(), &data);
278            }
279            DataEvent::Response(resp) => {
280                msgbus::send_data_response(MessagingSwitchboard::data_engine_response(), resp);
281            }
282            DataEvent::FundingRate(funding_rate) => {
283                msgbus::send_any(MessagingSwitchboard::data_engine_process(), &funding_rate);
284            }
285            #[cfg(feature = "defi")]
286            DataEvent::DeFi(data) => {
287                msgbus::send_defi_data(MessagingSwitchboard::data_engine_process_defi_data(), data);
288            }
289        }
290    }
291
292    /// Handles an execution command by sending to the ExecEngine.
293    #[inline]
294    pub fn handle_exec_command(cmd: TradingCommand) {
295        msgbus::send_trading_command(MessagingSwitchboard::exec_engine_execute(), cmd);
296    }
297
298    /// Handles an execution event by sending to the appropriate engine endpoint.
299    #[inline]
300    pub fn handle_exec_event(event: ExecutionEvent) {
301        match event {
302            ExecutionEvent::Order(order_event) => {
303                msgbus::send_order_event(MessagingSwitchboard::exec_engine_process(), order_event);
304            }
305            ExecutionEvent::Report(report) => {
306                Self::handle_exec_report(report);
307            }
308            ExecutionEvent::Account(ref account) => {
309                msgbus::send_account_state(
310                    MessagingSwitchboard::portfolio_update_account(),
311                    account,
312                );
313            }
314        }
315    }
316
317    #[inline]
318    pub fn handle_exec_report(report: ExecutionReport) {
319        let endpoint = MessagingSwitchboard::exec_engine_reconcile_execution_report();
320        msgbus::send_execution_report(endpoint, report);
321    }
322}
323
324#[cfg(test)]
325mod tests {
326    use std::time::Duration;
327
328    use nautilus_common::{
329        messages::{
330            ExecutionEvent, ExecutionReport,
331            data::{SubscribeCommand, SubscribeCustomData},
332            execution::{CancelAllOrders, TradingCommand},
333        },
334        timer::{TimeEvent, TimeEventCallback, TimeEventHandler},
335    };
336    use nautilus_core::{UUID4, UnixNanos};
337    use nautilus_model::{
338        data::{Data, DataType, quote::QuoteTick},
339        enums::{
340            AccountType, LiquiditySide, OrderSide, OrderStatus, OrderType, PositionSideSpecified,
341            TimeInForce,
342        },
343        events::{OrderEvent, OrderEventAny, OrderSubmitted, account::state::AccountState},
344        identifiers::{
345            AccountId, ClientId, ClientOrderId, InstrumentId, PositionId, StrategyId, TradeId,
346            TraderId, VenueOrderId,
347        },
348        reports::{FillReport, OrderStatusReport, PositionStatusReport},
349        types::{Money, Price, Quantity},
350    };
351    use rstest::rstest;
352    use ustr::Ustr;
353
354    use super::*;
355
356    // Test fixture for creating test quotes
357    fn test_quote() -> QuoteTick {
358        QuoteTick {
359            instrument_id: InstrumentId::from("EUR/USD.SIM"),
360            bid_price: Price::from("1.10000"),
361            ask_price: Price::from("1.10001"),
362            bid_size: Quantity::from(1_000_000),
363            ask_size: Quantity::from(1_000_000),
364            ts_event: UnixNanos::default(),
365            ts_init: UnixNanos::default(),
366        }
367    }
368
369    // Test helper to create AsyncRunner with manual channels
370    fn create_test_runner(
371        time_evt_rx: tokio::sync::mpsc::UnboundedReceiver<TimeEventHandler>,
372        data_evt_rx: tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
373        data_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
374        exec_evt_rx: tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
375        exec_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<TradingCommand>,
376        signal_rx: tokio::sync::mpsc::UnboundedReceiver<()>,
377        signal_tx: tokio::sync::mpsc::UnboundedSender<()>,
378    ) -> AsyncRunner {
379        AsyncRunner {
380            channels: AsyncRunnerChannels {
381                time_evt_rx,
382                data_evt_rx,
383                data_cmd_rx,
384                exec_evt_rx,
385                exec_cmd_rx,
386            },
387            signal_rx,
388            signal_tx,
389        }
390    }
391
392    #[rstest]
393    fn test_async_data_command_sender_creation() {
394        let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
395        let sender = AsyncDataCommandSender::new(tx);
396        assert!(format!("{sender:?}").contains("AsyncDataCommandSender"));
397    }
398
399    #[rstest]
400    fn test_async_time_event_sender_creation() {
401        let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
402        let sender = AsyncTimeEventSender::new(tx);
403        assert!(format!("{sender:?}").contains("AsyncTimeEventSender"));
404    }
405
406    #[rstest]
407    fn test_async_time_event_sender_get_channel() {
408        let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
409        let sender = AsyncTimeEventSender::new(tx);
410        let channel = sender.get_channel_sender();
411
412        // Verify the channel is functional
413        let event = TimeEvent::new(
414            Ustr::from("test"),
415            UUID4::new(),
416            UnixNanos::from(1),
417            UnixNanos::from(2),
418        );
419        let callback = TimeEventCallback::from(|_: TimeEvent| {});
420        let handler = TimeEventHandler::new(event, callback);
421
422        assert!(channel.send(handler).is_ok());
423    }
424
425    #[tokio::test]
426    async fn test_async_data_command_sender_execute() {
427        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
428        let sender = AsyncDataCommandSender::new(tx);
429
430        let command = DataCommand::Subscribe(SubscribeCommand::Data(SubscribeCustomData {
431            client_id: Some(ClientId::from("TEST")),
432            venue: None,
433            data_type: DataType::new("QuoteTick", None),
434            command_id: UUID4::new(),
435            ts_init: UnixNanos::default(),
436            correlation_id: None,
437            params: None,
438        }));
439
440        sender.execute(command.clone());
441
442        let received = rx.recv().await.unwrap();
443        match (received, command) {
444            (
445                DataCommand::Subscribe(SubscribeCommand::Data(r)),
446                DataCommand::Subscribe(SubscribeCommand::Data(c)),
447            ) => {
448                assert_eq!(r.client_id, c.client_id);
449                assert_eq!(r.data_type, c.data_type);
450            }
451            _ => panic!("Command mismatch"),
452        }
453    }
454
455    #[tokio::test]
456    async fn test_async_time_event_sender_send() {
457        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
458        let sender = AsyncTimeEventSender::new(tx);
459
460        let event = TimeEvent::new(
461            Ustr::from("test"),
462            UUID4::new(),
463            UnixNanos::from(1),
464            UnixNanos::from(2),
465        );
466        let callback = TimeEventCallback::from(|_: TimeEvent| {});
467        let handler = TimeEventHandler::new(event, callback);
468
469        sender.send(handler);
470
471        assert!(rx.recv().await.is_some());
472    }
473
474    #[tokio::test]
475    async fn test_runner_shutdown_signal() {
476        // Create runner with manual channels to avoid global state
477        let (_data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
478        let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
479        let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
480        let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
481        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
482        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
483
484        let mut runner = create_test_runner(
485            time_evt_rx,
486            data_evt_rx,
487            data_cmd_rx,
488            exec_evt_rx,
489            exec_cmd_rx,
490            signal_rx,
491            signal_tx.clone(),
492        );
493
494        // Start runner
495        let runner_handle = tokio::spawn(async move {
496            runner.run().await;
497        });
498
499        // Send shutdown signal
500        signal_tx.send(()).unwrap();
501
502        // Runner should stop quickly
503        let result = tokio::time::timeout(Duration::from_millis(100), runner_handle).await;
504        assert!(result.is_ok(), "Runner should stop on signal");
505    }
506
507    #[tokio::test]
508    async fn test_runner_closes_on_channel_drop() {
509        let (data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
510        let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
511        let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
512        let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
513        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
514        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
515
516        let mut runner = create_test_runner(
517            time_evt_rx,
518            data_evt_rx,
519            data_cmd_rx,
520            exec_evt_rx,
521            exec_cmd_rx,
522            signal_rx,
523            signal_tx.clone(),
524        );
525
526        // Start runner
527        let runner_handle = tokio::spawn(async move {
528            runner.run().await;
529        });
530
531        drop(data_tx);
532
533        // Yield to let runner enter event loop before stop signal
534        tokio::task::yield_now().await;
535        signal_tx.send(()).ok();
536
537        // Runner should stop when channels close or on signal
538        let result = tokio::time::timeout(Duration::from_millis(200), runner_handle).await;
539        assert!(
540            result.is_ok(),
541            "Runner should stop when channels close or on signal"
542        );
543    }
544
545    #[tokio::test]
546    async fn test_concurrent_event_sending() {
547        let (data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
548        let (_data_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
549        let (_time_evt_tx, time_evt_rx) =
550            tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
551        let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
552        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
553        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
554
555        // Setup runner
556        let mut runner = create_test_runner(
557            time_evt_rx,
558            data_evt_rx,
559            data_cmd_rx,
560            exec_evt_rx,
561            exec_cmd_rx,
562            signal_rx,
563            signal_tx.clone(),
564        );
565
566        // Spawn multiple concurrent senders
567        let mut handles = vec![];
568        for _ in 0..5 {
569            let tx_clone = data_evt_tx.clone();
570            let handle = tokio::spawn(async move {
571                for _ in 0..20 {
572                    let quote = test_quote();
573                    tx_clone.send(DataEvent::Data(Data::Quote(quote))).unwrap();
574                    tokio::task::yield_now().await;
575                }
576            });
577            handles.push(handle);
578        }
579
580        // Start runner in background
581        let runner_handle = tokio::spawn(async move {
582            runner.run().await;
583        });
584
585        // Wait for all senders
586        for handle in handles {
587            handle.await.unwrap();
588        }
589
590        // Yield to let runner enter event loop before stop signal
591        tokio::task::yield_now().await;
592        signal_tx.send(()).unwrap();
593
594        let _ = tokio::time::timeout(Duration::from_millis(200), runner_handle).await;
595    }
596
597    #[rstest]
598    #[case(10)]
599    #[case(100)]
600    #[case(1000)]
601    fn test_channel_send_performance(#[case] count: usize) {
602        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
603        let quote = test_quote();
604
605        // Send events
606        for _ in 0..count {
607            tx.send(DataEvent::Data(Data::Quote(quote))).unwrap();
608        }
609
610        // Verify all received
611        let mut received = 0;
612        while rx.try_recv().is_ok() {
613            received += 1;
614        }
615
616        assert_eq!(received, count);
617    }
618
619    #[rstest]
620    fn test_async_trading_command_sender_creation() {
621        let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
622        let sender = AsyncTradingCommandSender::new(tx);
623        assert!(format!("{sender:?}").contains("AsyncTradingCommandSender"));
624    }
625
626    #[tokio::test]
627    async fn test_async_trading_command_sender_execute() {
628        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
629        let sender = AsyncTradingCommandSender::new(tx);
630
631        let command = TradingCommand::CancelAllOrders(CancelAllOrders::new(
632            TraderId::from("TRADER-001"),
633            None,
634            StrategyId::from("S-001"),
635            InstrumentId::from("EUR/USD.SIM"),
636            OrderSide::Buy,
637            UUID4::new(),
638            UnixNanos::default(),
639            None,
640        ));
641
642        sender.execute(command);
643
644        let received = rx.recv().await;
645        assert!(received.is_some());
646        assert!(matches!(
647            received.unwrap(),
648            TradingCommand::CancelAllOrders(_)
649        ));
650    }
651
652    #[tokio::test]
653    async fn test_runner_processes_trading_commands() {
654        let (_data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
655        let (_data_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
656        let (_time_evt_tx, time_evt_rx) =
657            tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
658        let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
659        let (exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
660        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
661
662        let mut runner = create_test_runner(
663            time_evt_rx,
664            data_evt_rx,
665            data_cmd_rx,
666            exec_evt_rx,
667            exec_cmd_rx,
668            signal_rx,
669            signal_tx.clone(),
670        );
671
672        let runner_handle = tokio::spawn(async move {
673            runner.run().await;
674        });
675
676        let command = TradingCommand::CancelAllOrders(CancelAllOrders::new(
677            TraderId::from("TRADER-001"),
678            None,
679            StrategyId::from("S-001"),
680            InstrumentId::from("EUR/USD.SIM"),
681            OrderSide::Buy,
682            UUID4::new(),
683            UnixNanos::default(),
684            None,
685        ));
686        exec_cmd_tx.send(command).unwrap();
687
688        tokio::task::yield_now().await;
689        signal_tx.send(()).unwrap();
690
691        let result = tokio::time::timeout(Duration::from_millis(100), runner_handle).await;
692        assert!(result.is_ok(), "Runner should process command and stop");
693    }
694
695    #[tokio::test]
696    async fn test_runner_processes_multiple_trading_commands() {
697        let (_data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
698        let (_data_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
699        let (_time_evt_tx, time_evt_rx) =
700            tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
701        let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
702        let (exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
703        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
704
705        let mut runner = create_test_runner(
706            time_evt_rx,
707            data_evt_rx,
708            data_cmd_rx,
709            exec_evt_rx,
710            exec_cmd_rx,
711            signal_rx,
712            signal_tx.clone(),
713        );
714
715        let runner_handle = tokio::spawn(async move {
716            runner.run().await;
717        });
718
719        for i in 0..10 {
720            let strategy_id = format!("S-{i:03}");
721            let command = TradingCommand::CancelAllOrders(CancelAllOrders::new(
722                TraderId::from("TRADER-001"),
723                None,
724                StrategyId::from(strategy_id.as_str()),
725                InstrumentId::from("EUR/USD.SIM"),
726                OrderSide::Buy,
727                UUID4::new(),
728                UnixNanos::default(),
729                None,
730            ));
731            exec_cmd_tx.send(command).unwrap();
732        }
733
734        tokio::task::yield_now().await;
735        signal_tx.send(()).unwrap();
736
737        let result = tokio::time::timeout(Duration::from_millis(100), runner_handle).await;
738        assert!(
739            result.is_ok(),
740            "Runner should process all commands and stop"
741        );
742    }
743
744    #[tokio::test]
745    async fn test_execution_event_order_channel() {
746        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
747
748        let event = OrderSubmitted::new(
749            TraderId::from("TRADER-001"),
750            StrategyId::from("S-001"),
751            InstrumentId::from("EUR/USD.SIM"),
752            ClientOrderId::from("O-001"),
753            AccountId::from("SIM-001"),
754            UUID4::new(),
755            UnixNanos::from(1),
756            UnixNanos::from(2),
757        );
758
759        tx.send(ExecutionEvent::Order(OrderEventAny::Submitted(event)))
760            .unwrap();
761
762        let received = rx.recv().await.unwrap();
763        match received {
764            ExecutionEvent::Order(OrderEventAny::Submitted(e)) => {
765                assert_eq!(e.client_order_id(), ClientOrderId::from("O-001"));
766            }
767            _ => panic!("Expected OrderSubmitted event"),
768        }
769    }
770
771    #[tokio::test]
772    async fn test_execution_report_order_status_channel() {
773        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
774
775        let report = OrderStatusReport::new(
776            AccountId::from("SIM-001"),
777            InstrumentId::from("EUR/USD.SIM"),
778            Some(ClientOrderId::from("O-001")),
779            VenueOrderId::from("V-001"),
780            OrderSide::Buy,
781            OrderType::Market,
782            TimeInForce::Gtc,
783            OrderStatus::Accepted,
784            Quantity::from(100_000),
785            Quantity::from(100_000),
786            UnixNanos::from(1),
787            UnixNanos::from(2),
788            UnixNanos::from(3),
789            None,
790        );
791
792        tx.send(ExecutionEvent::Report(ExecutionReport::Order(Box::new(
793            report,
794        ))))
795        .unwrap();
796
797        let received = rx.recv().await.unwrap();
798        match received {
799            ExecutionEvent::Report(ExecutionReport::Order(r)) => {
800                assert_eq!(r.venue_order_id.as_str(), "V-001");
801                assert_eq!(r.order_status, OrderStatus::Accepted);
802            }
803            _ => panic!("Expected OrderStatusReport"),
804        }
805    }
806
807    #[tokio::test]
808    async fn test_execution_report_fill() {
809        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
810
811        let report = FillReport::new(
812            AccountId::from("SIM-001"),
813            InstrumentId::from("EUR/USD.SIM"),
814            VenueOrderId::from("V-001"),
815            TradeId::from("T-001"),
816            OrderSide::Buy,
817            Quantity::from(100_000),
818            Price::from("1.10000"),
819            Money::from("10 USD"),
820            LiquiditySide::Taker,
821            Some(ClientOrderId::from("O-001")),
822            None,
823            UnixNanos::from(1),
824            UnixNanos::from(2),
825            None,
826        );
827
828        tx.send(ExecutionEvent::Report(ExecutionReport::Fill(Box::new(
829            report,
830        ))))
831        .unwrap();
832
833        let received = rx.recv().await.unwrap();
834        match received {
835            ExecutionEvent::Report(ExecutionReport::Fill(r)) => {
836                assert_eq!(r.venue_order_id.as_str(), "V-001");
837                assert_eq!(r.trade_id.to_string(), "T-001");
838            }
839            _ => panic!("Expected FillReport"),
840        }
841    }
842
843    #[tokio::test]
844    async fn test_execution_report_position() {
845        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
846
847        let report = PositionStatusReport::new(
848            AccountId::from("SIM-001"),
849            InstrumentId::from("EUR/USD.SIM"),
850            PositionSideSpecified::Long,
851            Quantity::from(100_000),
852            UnixNanos::from(1),
853            UnixNanos::from(2),
854            None,
855            Some(PositionId::from("P-001")),
856            None,
857        );
858
859        tx.send(ExecutionEvent::Report(ExecutionReport::Position(Box::new(
860            report,
861        ))))
862        .unwrap();
863
864        let received = rx.recv().await.unwrap();
865        match received {
866            ExecutionEvent::Report(ExecutionReport::Position(r)) => {
867                assert_eq!(r.venue_position_id.unwrap().as_str(), "P-001");
868            }
869            _ => panic!("Expected PositionStatusReport"),
870        }
871    }
872
873    #[tokio::test]
874    async fn test_execution_event_account() {
875        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
876
877        let account_state = AccountState::new(
878            AccountId::from("SIM-001"),
879            AccountType::Cash,
880            vec![],
881            vec![],
882            true,
883            UUID4::new(),
884            UnixNanos::from(1),
885            UnixNanos::from(2),
886            None,
887        );
888
889        tx.send(ExecutionEvent::Account(account_state)).unwrap();
890
891        let received = rx.recv().await.unwrap();
892        match received {
893            ExecutionEvent::Account(r) => {
894                assert_eq!(r.account_id.as_str(), "SIM-001");
895            }
896            _ => panic!("Expected AccountState"),
897        }
898    }
899
900    #[tokio::test]
901    async fn test_runner_stop_method() {
902        let (_data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
903        let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
904        let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
905        let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
906        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
907        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
908
909        let mut runner = create_test_runner(
910            time_evt_rx,
911            data_evt_rx,
912            data_cmd_rx,
913            exec_evt_rx,
914            exec_cmd_rx,
915            signal_rx,
916            signal_tx.clone(),
917        );
918
919        let runner_handle = tokio::spawn(async move {
920            runner.run().await;
921        });
922
923        // Use stop via signal_tx directly
924        signal_tx.send(()).unwrap();
925
926        let result = tokio::time::timeout(Duration::from_millis(100), runner_handle).await;
927        assert!(result.is_ok(), "Runner should stop when stop() is called");
928    }
929
930    #[tokio::test]
931    async fn test_all_event_types_integration() {
932        let (data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
933        let (data_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
934        let (time_evt_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
935        let (exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
936        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
937        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
938
939        let mut runner = create_test_runner(
940            time_evt_rx,
941            data_evt_rx,
942            data_cmd_rx,
943            exec_evt_rx,
944            exec_cmd_rx,
945            signal_rx,
946            signal_tx.clone(),
947        );
948
949        let runner_handle = tokio::spawn(async move {
950            runner.run().await;
951        });
952
953        // Send data event
954        let quote = test_quote();
955        data_evt_tx
956            .send(DataEvent::Data(Data::Quote(quote)))
957            .unwrap();
958
959        // Send data command
960        let command = DataCommand::Subscribe(SubscribeCommand::Data(SubscribeCustomData {
961            client_id: Some(ClientId::from("TEST")),
962            venue: None,
963            data_type: DataType::new("QuoteTick", None),
964            command_id: UUID4::new(),
965            ts_init: UnixNanos::default(),
966            correlation_id: None,
967            params: None,
968        }));
969        data_cmd_tx.send(command).unwrap();
970
971        // Send time event
972        let event = TimeEvent::new(
973            Ustr::from("test"),
974            UUID4::new(),
975            UnixNanos::from(1),
976            UnixNanos::from(2),
977        );
978        let callback = TimeEventCallback::from(|_: TimeEvent| {});
979        let handler = TimeEventHandler::new(event, callback);
980        time_evt_tx.send(handler).unwrap();
981
982        // Send execution order event
983        let order_event = OrderSubmitted::new(
984            TraderId::from("TRADER-001"),
985            StrategyId::from("S-001"),
986            InstrumentId::from("EUR/USD.SIM"),
987            ClientOrderId::from("O-001"),
988            AccountId::from("SIM-001"),
989            UUID4::new(),
990            UnixNanos::from(1),
991            UnixNanos::from(2),
992        );
993        exec_evt_tx
994            .send(ExecutionEvent::Order(OrderEventAny::Submitted(order_event)))
995            .unwrap();
996
997        // Send execution report (OrderStatus)
998        let order_status = OrderStatusReport::new(
999            AccountId::from("SIM-001"),
1000            InstrumentId::from("EUR/USD.SIM"),
1001            Some(ClientOrderId::from("O-001")),
1002            VenueOrderId::from("V-001"),
1003            OrderSide::Buy,
1004            OrderType::Market,
1005            TimeInForce::Gtc,
1006            OrderStatus::Accepted,
1007            Quantity::from(100_000),
1008            Quantity::from(100_000),
1009            UnixNanos::from(1),
1010            UnixNanos::from(2),
1011            UnixNanos::from(3),
1012            None,
1013        );
1014        exec_evt_tx
1015            .send(ExecutionEvent::Report(ExecutionReport::Order(Box::new(
1016                order_status,
1017            ))))
1018            .unwrap();
1019
1020        // Send execution report (Fill)
1021        let fill = FillReport::new(
1022            AccountId::from("SIM-001"),
1023            InstrumentId::from("EUR/USD.SIM"),
1024            VenueOrderId::from("V-001"),
1025            TradeId::from("T-001"),
1026            OrderSide::Buy,
1027            Quantity::from(100_000),
1028            Price::from("1.10000"),
1029            Money::from("10 USD"),
1030            LiquiditySide::Taker,
1031            Some(ClientOrderId::from("O-001")),
1032            None,
1033            UnixNanos::from(1),
1034            UnixNanos::from(2),
1035            None,
1036        );
1037        exec_evt_tx
1038            .send(ExecutionEvent::Report(ExecutionReport::Fill(Box::new(
1039                fill,
1040            ))))
1041            .unwrap();
1042
1043        // Send execution report (Position)
1044        let position = PositionStatusReport::new(
1045            AccountId::from("SIM-001"),
1046            InstrumentId::from("EUR/USD.SIM"),
1047            PositionSideSpecified::Long,
1048            Quantity::from(100_000),
1049            UnixNanos::from(1),
1050            UnixNanos::from(2),
1051            None,
1052            Some(PositionId::from("P-001")),
1053            None,
1054        );
1055        exec_evt_tx
1056            .send(ExecutionEvent::Report(ExecutionReport::Position(Box::new(
1057                position,
1058            ))))
1059            .unwrap();
1060
1061        // Send account event
1062        let account_state = AccountState::new(
1063            AccountId::from("SIM-001"),
1064            AccountType::Cash,
1065            vec![],
1066            vec![],
1067            true,
1068            UUID4::new(),
1069            UnixNanos::from(1),
1070            UnixNanos::from(2),
1071            None,
1072        );
1073        exec_evt_tx
1074            .send(ExecutionEvent::Account(account_state))
1075            .unwrap();
1076
1077        // Yield to let runner enter event loop before stop signal
1078        tokio::task::yield_now().await;
1079        signal_tx.send(()).unwrap();
1080
1081        let result = tokio::time::timeout(Duration::from_millis(200), runner_handle).await;
1082        assert!(
1083            result.is_ok(),
1084            "Runner should process all event types and stop cleanly"
1085        );
1086    }
1087
1088    #[tokio::test]
1089    async fn test_runner_handle_stops_runner() {
1090        let (_data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
1091        let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
1092        let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
1093        let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
1094        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
1095        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
1096
1097        let mut runner = create_test_runner(
1098            time_evt_rx,
1099            data_evt_rx,
1100            data_cmd_rx,
1101            exec_evt_rx,
1102            exec_cmd_rx,
1103            signal_rx,
1104            signal_tx.clone(),
1105        );
1106
1107        // Get handle before moving runner
1108        let handle = runner.handle();
1109
1110        let runner_task = tokio::spawn(async move {
1111            runner.run().await;
1112        });
1113
1114        // Use handle to stop
1115        handle.stop();
1116
1117        let result = tokio::time::timeout(Duration::from_millis(100), runner_task).await;
1118        assert!(result.is_ok(), "Runner should stop via handle");
1119    }
1120
1121    #[tokio::test]
1122    async fn test_runner_handle_is_cloneable() {
1123        let (signal_tx, _signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
1124        let handle = AsyncRunnerHandle { signal_tx };
1125
1126        let handle2 = handle.clone();
1127
1128        // Both handles should be able to send stop signals
1129        assert!(handle.signal_tx.send(()).is_ok());
1130        assert!(handle2.signal_tx.send(()).is_ok());
1131    }
1132
1133    #[tokio::test]
1134    async fn test_runner_processes_events_before_stop() {
1135        let (data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
1136        let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
1137        let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
1138        let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
1139        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
1140        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
1141
1142        let mut runner = create_test_runner(
1143            time_evt_rx,
1144            data_evt_rx,
1145            data_cmd_rx,
1146            exec_evt_rx,
1147            exec_cmd_rx,
1148            signal_rx,
1149            signal_tx.clone(),
1150        );
1151
1152        let handle = runner.handle();
1153
1154        // Send events before starting runner
1155        for _ in 0..10 {
1156            let quote = test_quote();
1157            data_evt_tx
1158                .send(DataEvent::Data(Data::Quote(quote)))
1159                .unwrap();
1160        }
1161
1162        let runner_task = tokio::spawn(async move {
1163            runner.run().await;
1164        });
1165
1166        // Yield to let runner enter event loop before stop signal
1167        tokio::task::yield_now().await;
1168        handle.stop();
1169
1170        let result = tokio::time::timeout(Duration::from_millis(200), runner_task).await;
1171        assert!(result.is_ok(), "Runner should process events and stop");
1172    }
1173}