nautilus_live/
runner.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{any::Any, fmt::Debug, sync::Arc};
17
18use nautilus_common::{
19    live::runner::{set_data_event_sender, set_exec_event_sender},
20    messages::{
21        DataEvent, ExecutionEvent, ExecutionReport, data::DataCommand, execution::TradingCommand,
22    },
23    msgbus::{self, switchboard::MessagingSwitchboard},
24    runner::{
25        DataCommandSender, TimeEventSender, TradingCommandSender, set_data_cmd_sender,
26        set_exec_cmd_sender, set_time_event_sender,
27    },
28    timer::TimeEventHandler,
29};
30
31/// Asynchronous implementation of `DataCommandSender` for live environments.
32#[derive(Debug)]
33pub struct AsyncDataCommandSender {
34    cmd_tx: tokio::sync::mpsc::UnboundedSender<DataCommand>,
35}
36
37impl AsyncDataCommandSender {
38    #[must_use]
39    pub const fn new(cmd_tx: tokio::sync::mpsc::UnboundedSender<DataCommand>) -> Self {
40        Self { cmd_tx }
41    }
42}
43
44impl DataCommandSender for AsyncDataCommandSender {
45    fn execute(&self, command: DataCommand) {
46        if let Err(e) = self.cmd_tx.send(command) {
47            log::error!("Failed to send data command: {e}");
48        }
49    }
50}
51
52/// Asynchronous implementation of `TimeEventSender` for live environments.
53#[derive(Debug, Clone)]
54pub struct AsyncTimeEventSender {
55    time_tx: tokio::sync::mpsc::UnboundedSender<TimeEventHandler>,
56}
57
58impl AsyncTimeEventSender {
59    #[must_use]
60    pub const fn new(time_tx: tokio::sync::mpsc::UnboundedSender<TimeEventHandler>) -> Self {
61        Self { time_tx }
62    }
63
64    /// Gets a clone of the underlying channel sender for async use.
65    ///
66    /// This allows async contexts to get a direct channel sender that
67    /// can be moved into async tasks without `RefCell` borrowing issues.
68    #[must_use]
69    pub fn get_channel_sender(&self) -> tokio::sync::mpsc::UnboundedSender<TimeEventHandler> {
70        self.time_tx.clone()
71    }
72}
73
74impl TimeEventSender for AsyncTimeEventSender {
75    fn send(&self, handler: TimeEventHandler) {
76        if let Err(e) = self.time_tx.send(handler) {
77            log::error!("Failed to send time event handler: {e}");
78        }
79    }
80}
81
82/// Asynchronous implementation of `TradingCommandSender` for live environments.
83#[derive(Debug)]
84pub struct AsyncTradingCommandSender {
85    cmd_tx: tokio::sync::mpsc::UnboundedSender<TradingCommand>,
86}
87
88impl AsyncTradingCommandSender {
89    #[must_use]
90    pub const fn new(cmd_tx: tokio::sync::mpsc::UnboundedSender<TradingCommand>) -> Self {
91        Self { cmd_tx }
92    }
93}
94
95impl TradingCommandSender for AsyncTradingCommandSender {
96    fn execute(&self, command: TradingCommand) {
97        if let Err(e) = self.cmd_tx.send(command) {
98            log::error!("Failed to send trading command: {e}");
99        }
100    }
101}
102
103pub trait Runner {
104    fn run(&mut self);
105}
106
107/// Channel receivers for the async event loop.
108///
109/// These can be extracted from `AsyncRunner` via `take_channels()` to drive
110/// the event loop directly on the same thread as the msgbus endpoints.
111#[derive(Debug)]
112pub struct AsyncRunnerChannels {
113    pub time_evt_rx: tokio::sync::mpsc::UnboundedReceiver<TimeEventHandler>,
114    pub data_evt_rx: tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
115    pub data_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
116    pub exec_evt_rx: tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
117    pub exec_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<TradingCommand>,
118}
119
120pub struct AsyncRunner {
121    channels: AsyncRunnerChannels,
122    signal_rx: tokio::sync::mpsc::UnboundedReceiver<()>,
123    signal_tx: tokio::sync::mpsc::UnboundedSender<()>,
124}
125
126/// Handle for stopping the AsyncRunner from another context.
127#[derive(Clone, Debug)]
128pub struct AsyncRunnerHandle {
129    signal_tx: tokio::sync::mpsc::UnboundedSender<()>,
130}
131
132impl AsyncRunnerHandle {
133    /// Signals the runner to stop.
134    pub fn stop(&self) {
135        if let Err(e) = self.signal_tx.send(()) {
136            log::error!("Failed to send shutdown signal: {e}");
137        }
138    }
139}
140
141impl Default for AsyncRunner {
142    fn default() -> Self {
143        Self::new()
144    }
145}
146
147impl Debug for AsyncRunner {
148    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149        f.debug_struct(stringify!(AsyncRunner)).finish()
150    }
151}
152
153impl AsyncRunner {
154    /// Creates a new [`AsyncRunner`] instance.
155    #[must_use]
156    pub fn new() -> Self {
157        use tokio::sync::mpsc::unbounded_channel; // tokio-import-ok
158
159        let (time_evt_tx, time_evt_rx) = unbounded_channel::<TimeEventHandler>();
160        let (data_cmd_tx, data_cmd_rx) = unbounded_channel::<DataCommand>();
161        let (data_evt_tx, data_evt_rx) = unbounded_channel::<DataEvent>();
162        let (exec_cmd_tx, exec_cmd_rx) = unbounded_channel::<TradingCommand>();
163        let (exec_evt_tx, exec_evt_rx) = unbounded_channel::<ExecutionEvent>();
164        let (signal_tx, signal_rx) = unbounded_channel::<()>();
165
166        set_time_event_sender(Arc::new(AsyncTimeEventSender::new(time_evt_tx)));
167        set_data_cmd_sender(Arc::new(AsyncDataCommandSender::new(data_cmd_tx)));
168        set_data_event_sender(data_evt_tx);
169        set_exec_cmd_sender(Arc::new(AsyncTradingCommandSender::new(exec_cmd_tx)));
170        set_exec_event_sender(exec_evt_tx);
171
172        Self {
173            channels: AsyncRunnerChannels {
174                time_evt_rx,
175                data_evt_rx,
176                data_cmd_rx,
177                exec_evt_rx,
178                exec_cmd_rx,
179            },
180            signal_rx,
181            signal_tx,
182        }
183    }
184
185    /// Stops the runner with an internal shutdown signal.
186    pub fn stop(&self) {
187        if let Err(e) = self.signal_tx.send(()) {
188            log::error!("Failed to send shutdown signal: {e}");
189        }
190    }
191
192    /// Returns a handle that can be used to stop the runner from another context.
193    #[must_use]
194    pub fn handle(&self) -> AsyncRunnerHandle {
195        AsyncRunnerHandle {
196            signal_tx: self.signal_tx.clone(),
197        }
198    }
199
200    /// Consumes the runner and returns the channel receivers for direct event loop driving.
201    ///
202    /// This is used when the event loop needs to run on the same thread as the msgbus
203    /// endpoints (which use thread-local storage).
204    #[must_use]
205    pub fn take_channels(self) -> AsyncRunnerChannels {
206        self.channels
207    }
208
209    /// Drains all pending data events from the channel and processes them.
210    pub fn drain_pending_data_events(&mut self) {
211        let mut count = 0;
212        while let Ok(evt) = self.channels.data_evt_rx.try_recv() {
213            Self::handle_data_event(evt);
214            count += 1;
215        }
216        if count > 0 {
217            log::debug!("Drained {count} pending data events");
218        }
219    }
220
221    /// Runs the async runner event loop.
222    ///
223    /// This method processes data events, time events, execution events, and signal events in an async loop.
224    /// It will run until a signal is received or the event streams are closed.
225    pub async fn run(&mut self) {
226        log::info!("AsyncRunner starting");
227
228        loop {
229            tokio::select! {
230                Some(()) = self.signal_rx.recv() => {
231                    log::info!("AsyncRunner received signal, shutting down");
232                    return;
233                },
234                Some(handler) = self.channels.time_evt_rx.recv() => {
235                    Self::handle_time_event(handler);
236                },
237                Some(cmd) = self.channels.data_cmd_rx.recv() => {
238                    Self::handle_data_command(cmd);
239                },
240                Some(evt) = self.channels.data_evt_rx.recv() => {
241                    Self::handle_data_event(evt);
242                },
243                Some(cmd) = self.channels.exec_cmd_rx.recv() => {
244                    Self::handle_exec_command(cmd);
245                },
246                Some(evt) = self.channels.exec_evt_rx.recv() => {
247                    Self::handle_exec_event(evt);
248                },
249                else => {
250                    log::debug!("AsyncRunner all channels closed, exiting");
251                    return;
252                }
253            };
254        }
255    }
256
257    /// Handles a time event by running its callback.
258    #[inline]
259    pub fn handle_time_event(handler: TimeEventHandler) {
260        handler.run();
261    }
262
263    /// Handles a data command by sending to the DataEngine.
264    #[inline]
265    pub fn handle_data_command(cmd: DataCommand) {
266        msgbus::send_any(MessagingSwitchboard::data_engine_execute(), &cmd);
267    }
268
269    /// Handles a data event by sending to the appropriate DataEngine endpoint.
270    #[inline]
271    pub fn handle_data_event(event: DataEvent) {
272        match event {
273            DataEvent::Data(data) => {
274                msgbus::send_any(MessagingSwitchboard::data_engine_process(), &data);
275            }
276            DataEvent::Instrument(data) => {
277                msgbus::send_any(MessagingSwitchboard::data_engine_process(), &data);
278            }
279            DataEvent::Response(resp) => {
280                msgbus::send_any(MessagingSwitchboard::data_engine_response(), &resp);
281            }
282            DataEvent::FundingRate(funding_rate) => {
283                msgbus::send_any(MessagingSwitchboard::data_engine_process(), &funding_rate);
284            }
285            #[cfg(feature = "defi")]
286            DataEvent::DeFi(data) => {
287                msgbus::send_any(MessagingSwitchboard::data_engine_process(), &data);
288            }
289        }
290    }
291
292    /// Handles an execution command by sending to the ExecEngine.
293    #[inline]
294    pub fn handle_exec_command(cmd: TradingCommand) {
295        msgbus::send_any(MessagingSwitchboard::exec_engine_execute(), &cmd);
296    }
297
298    /// Handles an execution event by sending to the appropriate engine endpoint.
299    #[inline]
300    pub fn handle_exec_event(event: ExecutionEvent) {
301        match event {
302            ExecutionEvent::Order(ref order_event) => {
303                msgbus::send_any(MessagingSwitchboard::exec_engine_process(), order_event);
304            }
305            ExecutionEvent::Report(report) => {
306                Self::handle_exec_report(report);
307            }
308            ExecutionEvent::Account(ref account) => {
309                msgbus::send_any(MessagingSwitchboard::portfolio_update_account(), account);
310            }
311        }
312    }
313
314    #[inline]
315    pub fn handle_exec_report(report: ExecutionReport) {
316        let endpoint = MessagingSwitchboard::exec_engine_reconcile_execution_report();
317        msgbus::send_any(endpoint, &report as &dyn Any);
318    }
319}
320
321#[cfg(test)]
322mod tests {
323    use std::time::Duration;
324
325    use nautilus_common::{
326        messages::{
327            ExecutionEvent, ExecutionReport,
328            data::{SubscribeCommand, SubscribeCustomData},
329            execution::TradingCommand,
330        },
331        timer::{TimeEvent, TimeEventCallback, TimeEventHandler},
332    };
333    use nautilus_core::{UUID4, UnixNanos};
334    use nautilus_model::{
335        data::{Data, DataType, quote::QuoteTick},
336        enums::{
337            AccountType, LiquiditySide, OrderSide, OrderStatus, OrderType, PositionSideSpecified,
338            TimeInForce,
339        },
340        events::{OrderEvent, OrderEventAny, OrderSubmitted, account::state::AccountState},
341        identifiers::{
342            AccountId, ClientId, ClientOrderId, InstrumentId, PositionId, StrategyId, TradeId,
343            TraderId, VenueOrderId,
344        },
345        reports::{FillReport, OrderStatusReport, PositionStatusReport},
346        types::{Money, Price, Quantity},
347    };
348    use rstest::rstest;
349    use ustr::Ustr;
350
351    use super::*;
352
353    // Test fixture for creating test quotes
354    fn test_quote() -> QuoteTick {
355        QuoteTick {
356            instrument_id: InstrumentId::from("EUR/USD.SIM"),
357            bid_price: Price::from("1.10000"),
358            ask_price: Price::from("1.10001"),
359            bid_size: Quantity::from(1_000_000),
360            ask_size: Quantity::from(1_000_000),
361            ts_event: UnixNanos::default(),
362            ts_init: UnixNanos::default(),
363        }
364    }
365
366    // Test helper to create AsyncRunner with manual channels
367    fn create_test_runner(
368        time_evt_rx: tokio::sync::mpsc::UnboundedReceiver<TimeEventHandler>,
369        data_evt_rx: tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
370        data_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
371        exec_evt_rx: tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
372        exec_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<TradingCommand>,
373        signal_rx: tokio::sync::mpsc::UnboundedReceiver<()>,
374        signal_tx: tokio::sync::mpsc::UnboundedSender<()>,
375    ) -> AsyncRunner {
376        AsyncRunner {
377            channels: AsyncRunnerChannels {
378                time_evt_rx,
379                data_evt_rx,
380                data_cmd_rx,
381                exec_evt_rx,
382                exec_cmd_rx,
383            },
384            signal_rx,
385            signal_tx,
386        }
387    }
388
389    #[rstest]
390    fn test_async_data_command_sender_creation() {
391        let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
392        let sender = AsyncDataCommandSender::new(tx);
393        assert!(format!("{sender:?}").contains("AsyncDataCommandSender"));
394    }
395
396    #[rstest]
397    fn test_async_time_event_sender_creation() {
398        let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
399        let sender = AsyncTimeEventSender::new(tx);
400        assert!(format!("{sender:?}").contains("AsyncTimeEventSender"));
401    }
402
403    #[rstest]
404    fn test_async_time_event_sender_get_channel() {
405        let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
406        let sender = AsyncTimeEventSender::new(tx);
407        let channel = sender.get_channel_sender();
408
409        // Verify the channel is functional
410        let event = TimeEvent::new(
411            Ustr::from("test"),
412            UUID4::new(),
413            UnixNanos::from(1),
414            UnixNanos::from(2),
415        );
416        let callback = TimeEventCallback::from(|_: TimeEvent| {});
417        let handler = TimeEventHandler::new(event, callback);
418
419        assert!(channel.send(handler).is_ok());
420    }
421
422    #[tokio::test]
423    async fn test_async_data_command_sender_execute() {
424        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
425        let sender = AsyncDataCommandSender::new(tx);
426
427        let command = DataCommand::Subscribe(SubscribeCommand::Data(SubscribeCustomData {
428            client_id: Some(ClientId::from("TEST")),
429            venue: None,
430            data_type: DataType::new("QuoteTick", None),
431            command_id: UUID4::new(),
432            ts_init: UnixNanos::default(),
433            correlation_id: None,
434            params: None,
435        }));
436
437        sender.execute(command.clone());
438
439        let received = rx.recv().await.unwrap();
440        match (received, command) {
441            (
442                DataCommand::Subscribe(SubscribeCommand::Data(r)),
443                DataCommand::Subscribe(SubscribeCommand::Data(c)),
444            ) => {
445                assert_eq!(r.client_id, c.client_id);
446                assert_eq!(r.data_type, c.data_type);
447            }
448            _ => panic!("Command mismatch"),
449        }
450    }
451
452    #[tokio::test]
453    async fn test_async_time_event_sender_send() {
454        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
455        let sender = AsyncTimeEventSender::new(tx);
456
457        let event = TimeEvent::new(
458            Ustr::from("test"),
459            UUID4::new(),
460            UnixNanos::from(1),
461            UnixNanos::from(2),
462        );
463        let callback = TimeEventCallback::from(|_: TimeEvent| {});
464        let handler = TimeEventHandler::new(event, callback);
465
466        sender.send(handler);
467
468        assert!(rx.recv().await.is_some());
469    }
470
471    #[tokio::test]
472    async fn test_runner_shutdown_signal() {
473        // Create runner with manual channels to avoid global state
474        let (_data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
475        let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
476        let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
477        let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
478        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
479        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
480
481        let mut runner = create_test_runner(
482            time_evt_rx,
483            data_evt_rx,
484            data_cmd_rx,
485            exec_evt_rx,
486            exec_cmd_rx,
487            signal_rx,
488            signal_tx.clone(),
489        );
490
491        // Start runner
492        let runner_handle = tokio::spawn(async move {
493            runner.run().await;
494        });
495
496        // Send shutdown signal
497        signal_tx.send(()).unwrap();
498
499        // Runner should stop quickly
500        let result = tokio::time::timeout(Duration::from_millis(100), runner_handle).await;
501        assert!(result.is_ok(), "Runner should stop on signal");
502    }
503
504    #[tokio::test]
505    async fn test_runner_closes_on_channel_drop() {
506        let (data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
507        let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
508        let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
509        let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
510        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
511        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
512
513        let mut runner = create_test_runner(
514            time_evt_rx,
515            data_evt_rx,
516            data_cmd_rx,
517            exec_evt_rx,
518            exec_cmd_rx,
519            signal_rx,
520            signal_tx.clone(),
521        );
522
523        // Start runner
524        let runner_handle = tokio::spawn(async move {
525            runner.run().await;
526        });
527
528        // Drop data sender to close channel - this should cause runner to exit
529        drop(data_tx);
530
531        // Send stop signal to ensure clean shutdown
532        tokio::time::sleep(Duration::from_millis(50)).await;
533        signal_tx.send(()).ok();
534
535        // Runner should stop when channels close or on signal
536        let result = tokio::time::timeout(Duration::from_millis(200), runner_handle).await;
537        assert!(
538            result.is_ok(),
539            "Runner should stop when channels close or on signal"
540        );
541    }
542
543    #[tokio::test]
544    async fn test_concurrent_event_sending() {
545        let (data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
546        let (_data_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
547        let (_time_evt_tx, time_evt_rx) =
548            tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
549        let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
550        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
551        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
552
553        // Setup runner
554        let mut runner = create_test_runner(
555            time_evt_rx,
556            data_evt_rx,
557            data_cmd_rx,
558            exec_evt_rx,
559            exec_cmd_rx,
560            signal_rx,
561            signal_tx.clone(),
562        );
563
564        // Spawn multiple concurrent senders
565        let mut handles = vec![];
566        for _ in 0..5 {
567            let tx_clone = data_evt_tx.clone();
568            let handle = tokio::spawn(async move {
569                for _ in 0..20 {
570                    let quote = test_quote();
571                    tx_clone.send(DataEvent::Data(Data::Quote(quote))).unwrap();
572                    tokio::task::yield_now().await;
573                }
574            });
575            handles.push(handle);
576        }
577
578        // Start runner in background
579        let runner_handle = tokio::spawn(async move {
580            runner.run().await;
581        });
582
583        // Wait for all senders
584        for handle in handles {
585            handle.await.unwrap();
586        }
587
588        // Give runner time to process
589        tokio::time::sleep(Duration::from_millis(50)).await;
590
591        // Stop runner
592        signal_tx.send(()).unwrap();
593
594        let _ = tokio::time::timeout(Duration::from_secs(1), runner_handle).await;
595    }
596
597    #[rstest]
598    #[case(10)]
599    #[case(100)]
600    #[case(1000)]
601    fn test_channel_send_performance(#[case] count: usize) {
602        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
603        let quote = test_quote();
604
605        // Send events
606        for _ in 0..count {
607            tx.send(DataEvent::Data(Data::Quote(quote))).unwrap();
608        }
609
610        // Verify all received
611        let mut received = 0;
612        while rx.try_recv().is_ok() {
613            received += 1;
614        }
615
616        assert_eq!(received, count);
617    }
618
619    #[rstest]
620    fn test_async_trading_command_sender_creation() {
621        let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
622        let sender = AsyncTradingCommandSender::new(tx);
623        assert!(format!("{sender:?}").contains("AsyncTradingCommandSender"));
624    }
625
626    #[tokio::test]
627    async fn test_execution_event_order_channel() {
628        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
629
630        let event = OrderSubmitted::new(
631            TraderId::from("TRADER-001"),
632            StrategyId::from("S-001"),
633            InstrumentId::from("EUR/USD.SIM"),
634            ClientOrderId::from("O-001"),
635            AccountId::from("SIM-001"),
636            UUID4::new(),
637            UnixNanos::from(1),
638            UnixNanos::from(2),
639        );
640
641        tx.send(ExecutionEvent::Order(OrderEventAny::Submitted(event)))
642            .unwrap();
643
644        let received = rx.recv().await.unwrap();
645        match received {
646            ExecutionEvent::Order(OrderEventAny::Submitted(e)) => {
647                assert_eq!(e.client_order_id(), ClientOrderId::from("O-001"));
648            }
649            _ => panic!("Expected OrderSubmitted event"),
650        }
651    }
652
653    #[tokio::test]
654    async fn test_execution_report_order_status_channel() {
655        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
656
657        let report = OrderStatusReport::new(
658            AccountId::from("SIM-001"),
659            InstrumentId::from("EUR/USD.SIM"),
660            Some(ClientOrderId::from("O-001")),
661            VenueOrderId::from("V-001"),
662            OrderSide::Buy,
663            OrderType::Market,
664            TimeInForce::Gtc,
665            OrderStatus::Accepted,
666            Quantity::from(100_000),
667            Quantity::from(100_000),
668            UnixNanos::from(1),
669            UnixNanos::from(2),
670            UnixNanos::from(3),
671            None,
672        );
673
674        tx.send(ExecutionEvent::Report(ExecutionReport::Order(Box::new(
675            report,
676        ))))
677        .unwrap();
678
679        let received = rx.recv().await.unwrap();
680        match received {
681            ExecutionEvent::Report(ExecutionReport::Order(r)) => {
682                assert_eq!(r.venue_order_id.as_str(), "V-001");
683                assert_eq!(r.order_status, OrderStatus::Accepted);
684            }
685            _ => panic!("Expected OrderStatusReport"),
686        }
687    }
688
689    #[tokio::test]
690    async fn test_execution_report_fill() {
691        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
692
693        let report = FillReport::new(
694            AccountId::from("SIM-001"),
695            InstrumentId::from("EUR/USD.SIM"),
696            VenueOrderId::from("V-001"),
697            TradeId::from("T-001"),
698            OrderSide::Buy,
699            Quantity::from(100_000),
700            Price::from("1.10000"),
701            Money::from("10 USD"),
702            LiquiditySide::Taker,
703            Some(ClientOrderId::from("O-001")),
704            None,
705            UnixNanos::from(1),
706            UnixNanos::from(2),
707            None,
708        );
709
710        tx.send(ExecutionEvent::Report(ExecutionReport::Fill(Box::new(
711            report,
712        ))))
713        .unwrap();
714
715        let received = rx.recv().await.unwrap();
716        match received {
717            ExecutionEvent::Report(ExecutionReport::Fill(r)) => {
718                assert_eq!(r.venue_order_id.as_str(), "V-001");
719                assert_eq!(r.trade_id.to_string(), "T-001");
720            }
721            _ => panic!("Expected FillReport"),
722        }
723    }
724
725    #[tokio::test]
726    async fn test_execution_report_position() {
727        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
728
729        let report = PositionStatusReport::new(
730            AccountId::from("SIM-001"),
731            InstrumentId::from("EUR/USD.SIM"),
732            PositionSideSpecified::Long,
733            Quantity::from(100_000),
734            UnixNanos::from(1),
735            UnixNanos::from(2),
736            None,
737            Some(PositionId::from("P-001")),
738            None,
739        );
740
741        tx.send(ExecutionEvent::Report(ExecutionReport::Position(Box::new(
742            report,
743        ))))
744        .unwrap();
745
746        let received = rx.recv().await.unwrap();
747        match received {
748            ExecutionEvent::Report(ExecutionReport::Position(r)) => {
749                assert_eq!(r.venue_position_id.unwrap().as_str(), "P-001");
750            }
751            _ => panic!("Expected PositionStatusReport"),
752        }
753    }
754
755    #[tokio::test]
756    async fn test_execution_event_account() {
757        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
758
759        let account_state = AccountState::new(
760            AccountId::from("SIM-001"),
761            AccountType::Cash,
762            vec![],
763            vec![],
764            true,
765            UUID4::new(),
766            UnixNanos::from(1),
767            UnixNanos::from(2),
768            None,
769        );
770
771        tx.send(ExecutionEvent::Account(account_state)).unwrap();
772
773        let received = rx.recv().await.unwrap();
774        match received {
775            ExecutionEvent::Account(r) => {
776                assert_eq!(r.account_id.as_str(), "SIM-001");
777            }
778            _ => panic!("Expected AccountState"),
779        }
780    }
781
782    #[tokio::test]
783    async fn test_runner_stop_method() {
784        let (_data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
785        let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
786        let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
787        let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
788        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
789        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
790
791        let mut runner = create_test_runner(
792            time_evt_rx,
793            data_evt_rx,
794            data_cmd_rx,
795            exec_evt_rx,
796            exec_cmd_rx,
797            signal_rx,
798            signal_tx.clone(),
799        );
800
801        let runner_handle = tokio::spawn(async move {
802            runner.run().await;
803        });
804
805        // Use stop via signal_tx directly
806        signal_tx.send(()).unwrap();
807
808        let result = tokio::time::timeout(Duration::from_millis(100), runner_handle).await;
809        assert!(result.is_ok(), "Runner should stop when stop() is called");
810    }
811
812    #[tokio::test]
813    async fn test_all_event_types_integration() {
814        let (data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
815        let (data_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
816        let (time_evt_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
817        let (exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
818        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
819        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
820
821        let mut runner = create_test_runner(
822            time_evt_rx,
823            data_evt_rx,
824            data_cmd_rx,
825            exec_evt_rx,
826            exec_cmd_rx,
827            signal_rx,
828            signal_tx.clone(),
829        );
830
831        let runner_handle = tokio::spawn(async move {
832            runner.run().await;
833        });
834
835        // Send data event
836        let quote = test_quote();
837        data_evt_tx
838            .send(DataEvent::Data(Data::Quote(quote)))
839            .unwrap();
840
841        // Send data command
842        let command = DataCommand::Subscribe(SubscribeCommand::Data(SubscribeCustomData {
843            client_id: Some(ClientId::from("TEST")),
844            venue: None,
845            data_type: DataType::new("QuoteTick", None),
846            command_id: UUID4::new(),
847            ts_init: UnixNanos::default(),
848            correlation_id: None,
849            params: None,
850        }));
851        data_cmd_tx.send(command).unwrap();
852
853        // Send time event
854        let event = TimeEvent::new(
855            Ustr::from("test"),
856            UUID4::new(),
857            UnixNanos::from(1),
858            UnixNanos::from(2),
859        );
860        let callback = TimeEventCallback::from(|_: TimeEvent| {});
861        let handler = TimeEventHandler::new(event, callback);
862        time_evt_tx.send(handler).unwrap();
863
864        // Send execution order event
865        let order_event = OrderSubmitted::new(
866            TraderId::from("TRADER-001"),
867            StrategyId::from("S-001"),
868            InstrumentId::from("EUR/USD.SIM"),
869            ClientOrderId::from("O-001"),
870            AccountId::from("SIM-001"),
871            UUID4::new(),
872            UnixNanos::from(1),
873            UnixNanos::from(2),
874        );
875        exec_evt_tx
876            .send(ExecutionEvent::Order(OrderEventAny::Submitted(order_event)))
877            .unwrap();
878
879        // Send execution report (OrderStatus)
880        let order_status = OrderStatusReport::new(
881            AccountId::from("SIM-001"),
882            InstrumentId::from("EUR/USD.SIM"),
883            Some(ClientOrderId::from("O-001")),
884            VenueOrderId::from("V-001"),
885            OrderSide::Buy,
886            OrderType::Market,
887            TimeInForce::Gtc,
888            OrderStatus::Accepted,
889            Quantity::from(100_000),
890            Quantity::from(100_000),
891            UnixNanos::from(1),
892            UnixNanos::from(2),
893            UnixNanos::from(3),
894            None,
895        );
896        exec_evt_tx
897            .send(ExecutionEvent::Report(ExecutionReport::Order(Box::new(
898                order_status,
899            ))))
900            .unwrap();
901
902        // Send execution report (Fill)
903        let fill = FillReport::new(
904            AccountId::from("SIM-001"),
905            InstrumentId::from("EUR/USD.SIM"),
906            VenueOrderId::from("V-001"),
907            TradeId::from("T-001"),
908            OrderSide::Buy,
909            Quantity::from(100_000),
910            Price::from("1.10000"),
911            Money::from("10 USD"),
912            LiquiditySide::Taker,
913            Some(ClientOrderId::from("O-001")),
914            None,
915            UnixNanos::from(1),
916            UnixNanos::from(2),
917            None,
918        );
919        exec_evt_tx
920            .send(ExecutionEvent::Report(ExecutionReport::Fill(Box::new(
921                fill,
922            ))))
923            .unwrap();
924
925        // Send execution report (Position)
926        let position = PositionStatusReport::new(
927            AccountId::from("SIM-001"),
928            InstrumentId::from("EUR/USD.SIM"),
929            PositionSideSpecified::Long,
930            Quantity::from(100_000),
931            UnixNanos::from(1),
932            UnixNanos::from(2),
933            None,
934            Some(PositionId::from("P-001")),
935            None,
936        );
937        exec_evt_tx
938            .send(ExecutionEvent::Report(ExecutionReport::Position(Box::new(
939                position,
940            ))))
941            .unwrap();
942
943        // Send account event
944        let account_state = AccountState::new(
945            AccountId::from("SIM-001"),
946            AccountType::Cash,
947            vec![],
948            vec![],
949            true,
950            UUID4::new(),
951            UnixNanos::from(1),
952            UnixNanos::from(2),
953            None,
954        );
955        exec_evt_tx
956            .send(ExecutionEvent::Account(account_state))
957            .unwrap();
958
959        // Give runner time to process all events
960        tokio::time::sleep(Duration::from_millis(100)).await;
961
962        // Stop runner
963        signal_tx.send(()).unwrap();
964
965        let result = tokio::time::timeout(Duration::from_secs(1), runner_handle).await;
966        assert!(
967            result.is_ok(),
968            "Runner should process all event types and stop cleanly"
969        );
970    }
971
972    #[tokio::test]
973    async fn test_runner_handle_stops_runner() {
974        let (_data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
975        let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
976        let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
977        let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
978        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
979        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
980
981        let mut runner = create_test_runner(
982            time_evt_rx,
983            data_evt_rx,
984            data_cmd_rx,
985            exec_evt_rx,
986            exec_cmd_rx,
987            signal_rx,
988            signal_tx.clone(),
989        );
990
991        // Get handle before moving runner
992        let handle = runner.handle();
993
994        let runner_task = tokio::spawn(async move {
995            runner.run().await;
996        });
997
998        // Use handle to stop
999        handle.stop();
1000
1001        let result = tokio::time::timeout(Duration::from_millis(100), runner_task).await;
1002        assert!(result.is_ok(), "Runner should stop via handle");
1003    }
1004
1005    #[tokio::test]
1006    async fn test_runner_handle_is_cloneable() {
1007        let (signal_tx, _signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
1008        let handle = AsyncRunnerHandle { signal_tx };
1009
1010        let handle2 = handle.clone();
1011
1012        // Both handles should be able to send stop signals
1013        assert!(handle.signal_tx.send(()).is_ok());
1014        assert!(handle2.signal_tx.send(()).is_ok());
1015    }
1016
1017    #[tokio::test]
1018    async fn test_runner_processes_events_before_stop() {
1019        let (data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
1020        let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
1021        let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
1022        let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
1023        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
1024        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
1025
1026        let mut runner = create_test_runner(
1027            time_evt_rx,
1028            data_evt_rx,
1029            data_cmd_rx,
1030            exec_evt_rx,
1031            exec_cmd_rx,
1032            signal_rx,
1033            signal_tx.clone(),
1034        );
1035
1036        let handle = runner.handle();
1037
1038        // Send events before starting runner
1039        for _ in 0..10 {
1040            let quote = test_quote();
1041            data_evt_tx
1042                .send(DataEvent::Data(Data::Quote(quote)))
1043                .unwrap();
1044        }
1045
1046        let runner_task = tokio::spawn(async move {
1047            runner.run().await;
1048        });
1049
1050        // Give runner time to process queued events
1051        tokio::time::sleep(Duration::from_millis(50)).await;
1052
1053        // Stop runner
1054        handle.stop();
1055
1056        let result = tokio::time::timeout(Duration::from_millis(200), runner_task).await;
1057        assert!(result.is_ok(), "Runner should process events and stop");
1058    }
1059}