1use std::{fmt::Debug, sync::Arc};
17
18use nautilus_common::{
19 messages::{
20 DataEvent, ExecutionEvent, ExecutionReport, data::DataCommand, execution::TradingCommand,
21 },
22 msgbus::{self, switchboard::MessagingSwitchboard},
23 runner::{
24 DataCommandSender, TimeEventSender, TradingCommandSender, set_data_cmd_sender,
25 set_data_event_sender, set_exec_cmd_sender, set_exec_event_sender, set_time_event_sender,
26 },
27 timer::TimeEventHandlerV2,
28};
29use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
30
31#[derive(Debug)]
33pub struct AsyncDataCommandSender {
34 cmd_tx: UnboundedSender<DataCommand>,
35}
36
37impl AsyncDataCommandSender {
38 #[must_use]
39 pub const fn new(cmd_tx: UnboundedSender<DataCommand>) -> Self {
40 Self { cmd_tx }
41 }
42}
43
44impl DataCommandSender for AsyncDataCommandSender {
45 fn execute(&self, command: DataCommand) {
46 if let Err(e) = self.cmd_tx.send(command) {
47 log::error!("Failed to send data command: {e}");
48 }
49 }
50}
51
52#[derive(Debug, Clone)]
54pub struct AsyncTimeEventSender {
55 time_tx: UnboundedSender<TimeEventHandlerV2>,
56}
57
58impl AsyncTimeEventSender {
59 #[must_use]
60 pub const fn new(time_tx: UnboundedSender<TimeEventHandlerV2>) -> Self {
61 Self { time_tx }
62 }
63
64 #[must_use]
69 pub fn get_channel_sender(&self) -> UnboundedSender<TimeEventHandlerV2> {
70 self.time_tx.clone()
71 }
72}
73
74impl TimeEventSender for AsyncTimeEventSender {
75 fn send(&self, handler: TimeEventHandlerV2) {
76 if let Err(e) = self.time_tx.send(handler) {
77 log::error!("Failed to send time event handler: {e}");
78 }
79 }
80}
81
82#[derive(Debug)]
84pub struct AsyncTradingCommandSender {
85 cmd_tx: UnboundedSender<TradingCommand>,
86}
87
88impl AsyncTradingCommandSender {
89 #[must_use]
90 pub const fn new(cmd_tx: UnboundedSender<TradingCommand>) -> Self {
91 Self { cmd_tx }
92 }
93}
94
95impl TradingCommandSender for AsyncTradingCommandSender {
96 fn execute(&self, command: TradingCommand) {
97 if let Err(e) = self.cmd_tx.send(command) {
98 log::error!("Failed to send trading command: {e}");
99 }
100 }
101}
102
103pub trait Runner {
104 fn run(&mut self);
105}
106
107pub struct AsyncRunner {
108 time_evt_rx: UnboundedReceiver<TimeEventHandlerV2>,
109 data_evt_rx: UnboundedReceiver<DataEvent>,
110 data_cmd_rx: UnboundedReceiver<DataCommand>,
111 exec_evt_rx: UnboundedReceiver<ExecutionEvent>,
112 exec_cmd_rx: UnboundedReceiver<TradingCommand>,
113 signal_rx: UnboundedReceiver<()>,
114 signal_tx: UnboundedSender<()>,
115}
116
117impl Default for AsyncRunner {
118 fn default() -> Self {
119 Self::new()
120 }
121}
122
123impl Debug for AsyncRunner {
124 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125 f.debug_struct(stringify!(AsyncRunner)).finish()
126 }
127}
128
129impl AsyncRunner {
130 #[must_use]
131 pub fn new() -> Self {
132 use tokio::sync::mpsc::unbounded_channel; let (time_evt_tx, time_evt_rx) = unbounded_channel::<TimeEventHandlerV2>();
135 let (data_cmd_tx, data_cmd_rx) = unbounded_channel::<DataCommand>();
136 let (data_evt_tx, data_evt_rx) = unbounded_channel::<DataEvent>();
137 let (exec_cmd_tx, exec_cmd_rx) = unbounded_channel::<TradingCommand>();
138 let (exec_evt_tx, exec_evt_rx) = unbounded_channel::<ExecutionEvent>();
139 let (signal_tx, signal_rx) = unbounded_channel::<()>();
140
141 set_time_event_sender(Arc::new(AsyncTimeEventSender::new(time_evt_tx)));
142 set_data_cmd_sender(Arc::new(AsyncDataCommandSender::new(data_cmd_tx)));
143 set_data_event_sender(data_evt_tx);
144 set_exec_cmd_sender(Arc::new(AsyncTradingCommandSender::new(exec_cmd_tx)));
145 set_exec_event_sender(exec_evt_tx);
146
147 Self {
148 time_evt_rx,
149 data_evt_rx,
150 data_cmd_rx,
151 exec_evt_rx,
152 exec_cmd_rx,
153 signal_rx,
154 signal_tx,
155 }
156 }
157
158 pub fn stop(&self) {
160 if let Err(e) = self.signal_tx.send(()) {
161 log::error!("Failed to send shutdown signal: {e}");
162 }
163 }
164}
165
166impl AsyncRunner {
167 pub async fn run(&mut self) {
172 log::info!("Starting AsyncRunner");
173
174 loop {
175 tokio::select! {
176 Some(handler) = self.time_evt_rx.recv() => {
177 Self::handle_time_event(handler);
178 },
179 Some(cmd) = self.data_cmd_rx.recv() => {
180 Self::handle_data_command(cmd);
181 },
182 Some(evt) = self.data_evt_rx.recv() => {
183 Self::handle_data_event(evt);
184 },
185 Some(cmd) = self.exec_cmd_rx.recv() => {
186 Self::handle_exec_command(cmd);
187 },
188 Some(evt) = self.exec_evt_rx.recv() => {
189 Self::handle_exec_event(evt);
190 },
191 Some(()) = self.signal_rx.recv() => {
192 tracing::info!("AsyncRunner received signal, shutting down");
193 return; },
195 else => return, };
197 }
198 }
199
200 #[inline]
201 fn handle_time_event(handler: TimeEventHandlerV2) {
202 handler.run();
203 }
204
205 #[inline]
206 fn handle_data_command(cmd: DataCommand) {
207 msgbus::send_any(MessagingSwitchboard::data_engine_execute(), &cmd);
208 }
209
210 #[inline]
211 fn handle_data_event(event: DataEvent) {
212 match event {
213 DataEvent::Data(data) => {
214 msgbus::send_any(MessagingSwitchboard::data_engine_process(), &data);
215 }
216 DataEvent::Response(resp) => {
217 msgbus::send_any(MessagingSwitchboard::data_engine_response(), &resp);
218 }
219 #[cfg(feature = "defi")]
220 DataEvent::DeFi(data) => {
221 msgbus::send_any(MessagingSwitchboard::data_engine_process(), &data);
222 }
223 }
224 }
225
226 #[inline]
227 fn handle_exec_command(cmd: TradingCommand) {
228 msgbus::send_any(MessagingSwitchboard::exec_engine_execute(), &cmd);
229 }
230
231 #[inline]
232 fn handle_exec_event(event: ExecutionEvent) {
233 match event {
234 ExecutionEvent::Order(order_event) => {
235 msgbus::send_any(MessagingSwitchboard::exec_engine_process(), &order_event);
236 }
237 ExecutionEvent::Report(report) => {
238 Self::handle_exec_report(report);
239 }
240 ExecutionEvent::Account(account) => {
241 msgbus::send_any(MessagingSwitchboard::portfolio_update_account(), &account);
242 }
243 }
244 }
245
246 #[inline]
247 fn handle_exec_report(report: ExecutionReport) {
248 match report {
249 ExecutionReport::OrderStatus(r) => {
250 msgbus::send_any(
251 MessagingSwitchboard::exec_engine_reconcile_execution_report(),
252 &*r,
253 );
254 }
255 ExecutionReport::Fill(r) => {
256 msgbus::send_any(
257 MessagingSwitchboard::exec_engine_reconcile_execution_report(),
258 &*r,
259 );
260 }
261 ExecutionReport::Position(r) => {
262 msgbus::send_any(
263 MessagingSwitchboard::exec_engine_reconcile_execution_report(),
264 &*r,
265 );
266 }
267 ExecutionReport::Mass(r) => {
268 msgbus::send_any(
269 MessagingSwitchboard::exec_engine_reconcile_execution_mass_status(),
270 &*r,
271 );
272 }
273 }
274 }
275}
276
277#[cfg(test)]
282mod tests {
283 use std::time::Duration;
284
285 use nautilus_common::{
286 messages::{
287 ExecutionEvent, ExecutionReport,
288 data::{SubscribeCommand, SubscribeCustomData},
289 execution::TradingCommand,
290 },
291 timer::{TimeEvent, TimeEventCallback, TimeEventHandlerV2},
292 };
293 use nautilus_core::{UUID4, UnixNanos};
294 use nautilus_model::{
295 data::{Data, DataType, quote::QuoteTick},
296 enums::{
297 AccountType, LiquiditySide, OrderSide, OrderStatus, OrderType, PositionSideSpecified,
298 TimeInForce,
299 },
300 events::{OrderEvent, OrderEventAny, OrderSubmitted, account::state::AccountState},
301 identifiers::{
302 AccountId, ClientId, ClientOrderId, InstrumentId, PositionId, StrategyId, TradeId,
303 TraderId, VenueOrderId,
304 },
305 reports::{FillReport, OrderStatusReport, PositionStatusReport},
306 types::{Money, Price, Quantity},
307 };
308 use rstest::rstest;
309 use ustr::Ustr;
310
311 use super::*;
312
313 fn test_quote() -> QuoteTick {
315 QuoteTick {
316 instrument_id: InstrumentId::from("EUR/USD.SIM"),
317 bid_price: Price::from("1.10000"),
318 ask_price: Price::from("1.10001"),
319 bid_size: Quantity::from(1_000_000),
320 ask_size: Quantity::from(1_000_000),
321 ts_event: UnixNanos::default(),
322 ts_init: UnixNanos::default(),
323 }
324 }
325
326 #[rstest]
327 fn test_async_data_command_sender_creation() {
328 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
329 let sender = AsyncDataCommandSender::new(tx);
330 assert!(format!("{sender:?}").contains("AsyncDataCommandSender"));
331 }
332
333 #[rstest]
334 fn test_async_time_event_sender_creation() {
335 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
336 let sender = AsyncTimeEventSender::new(tx);
337 assert!(format!("{sender:?}").contains("AsyncTimeEventSender"));
338 }
339
340 #[rstest]
341 fn test_async_time_event_sender_get_channel() {
342 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
343 let sender = AsyncTimeEventSender::new(tx);
344 let channel = sender.get_channel_sender();
345
346 let event = TimeEvent::new(
348 Ustr::from("test"),
349 UUID4::new(),
350 UnixNanos::from(1),
351 UnixNanos::from(2),
352 );
353 let callback = TimeEventCallback::from(|_: TimeEvent| {});
354 let handler = TimeEventHandlerV2::new(event, callback);
355
356 assert!(channel.send(handler).is_ok());
357 }
358
359 #[tokio::test]
360 async fn test_async_data_command_sender_execute() {
361 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
362 let sender = AsyncDataCommandSender::new(tx);
363
364 let command = DataCommand::Subscribe(SubscribeCommand::Data(SubscribeCustomData {
365 client_id: Some(ClientId::from("TEST")),
366 venue: None,
367 data_type: DataType::new("QuoteTick", None),
368 command_id: UUID4::new(),
369 ts_init: UnixNanos::default(),
370 params: None,
371 }));
372
373 sender.execute(command.clone());
374
375 let received = rx.recv().await.unwrap();
376 match (received, command) {
377 (
378 DataCommand::Subscribe(SubscribeCommand::Data(r)),
379 DataCommand::Subscribe(SubscribeCommand::Data(c)),
380 ) => {
381 assert_eq!(r.client_id, c.client_id);
382 assert_eq!(r.data_type, c.data_type);
383 }
384 _ => panic!("Command mismatch"),
385 }
386 }
387
388 #[tokio::test]
389 async fn test_async_time_event_sender_send() {
390 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
391 let sender = AsyncTimeEventSender::new(tx);
392
393 let event = TimeEvent::new(
394 Ustr::from("test"),
395 UUID4::new(),
396 UnixNanos::from(1),
397 UnixNanos::from(2),
398 );
399 let callback = TimeEventCallback::from(|_: TimeEvent| {});
400 let handler = TimeEventHandlerV2::new(event, callback);
401
402 sender.send(handler);
403
404 assert!(rx.recv().await.is_some());
405 }
406
407 #[tokio::test]
408 async fn test_runner_shutdown_signal() {
409 let (_data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
411 let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
412 let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
413 let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
414 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
415 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
416
417 let mut runner = AsyncRunner {
418 data_evt_rx,
419 data_cmd_rx,
420 time_evt_rx,
421 exec_evt_rx,
422 exec_cmd_rx,
423 signal_rx,
424 signal_tx: signal_tx.clone(),
425 };
426
427 let runner_handle = tokio::spawn(async move {
429 runner.run().await;
430 });
431
432 signal_tx.send(()).unwrap();
434
435 let result = tokio::time::timeout(Duration::from_millis(100), runner_handle).await;
437 assert!(result.is_ok(), "Runner should stop on signal");
438 }
439
440 #[tokio::test]
441 async fn test_runner_closes_on_channel_drop() {
442 let (data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
443 let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
444 let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
445 let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
446 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
447 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
448
449 let mut runner = AsyncRunner {
450 data_evt_rx,
451 data_cmd_rx,
452 time_evt_rx,
453 exec_evt_rx,
454 exec_cmd_rx,
455 signal_rx,
456 signal_tx: signal_tx.clone(),
457 };
458
459 let runner_handle = tokio::spawn(async move {
461 runner.run().await;
462 });
463
464 drop(data_tx);
466
467 tokio::time::sleep(Duration::from_millis(50)).await;
469 signal_tx.send(()).ok();
470
471 let result = tokio::time::timeout(Duration::from_millis(200), runner_handle).await;
473 assert!(
474 result.is_ok(),
475 "Runner should stop when channels close or on signal"
476 );
477 }
478
479 #[tokio::test]
480 async fn test_concurrent_event_sending() {
481 let (data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
482 let (_data_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
483 let (_time_evt_tx, time_evt_rx) =
484 tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
485 let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
486 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
487 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
488
489 let mut runner = AsyncRunner {
491 time_evt_rx,
492 data_evt_rx,
493 data_cmd_rx,
494 exec_evt_rx,
495 exec_cmd_rx,
496 signal_rx,
497 signal_tx: signal_tx.clone(),
498 };
499
500 let mut handles = vec![];
502 for _ in 0..5 {
503 let tx_clone = data_evt_tx.clone();
504 let handle = tokio::spawn(async move {
505 for _ in 0..20 {
506 let quote = test_quote();
507 tx_clone.send(DataEvent::Data(Data::Quote(quote))).unwrap();
508 tokio::task::yield_now().await;
509 }
510 });
511 handles.push(handle);
512 }
513
514 let runner_handle = tokio::spawn(async move {
516 runner.run().await;
517 });
518
519 for handle in handles {
521 handle.await.unwrap();
522 }
523
524 tokio::time::sleep(Duration::from_millis(50)).await;
526
527 signal_tx.send(()).unwrap();
529
530 let _ = tokio::time::timeout(Duration::from_secs(1), runner_handle).await;
531 }
532
533 #[rstest]
534 #[case(10)]
535 #[case(100)]
536 #[case(1000)]
537 fn test_channel_send_performance(#[case] count: usize) {
538 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
539 let quote = test_quote();
540
541 for _ in 0..count {
543 tx.send(DataEvent::Data(Data::Quote(quote))).unwrap();
544 }
545
546 let mut received = 0;
548 while rx.try_recv().is_ok() {
549 received += 1;
550 }
551
552 assert_eq!(received, count);
553 }
554
555 #[rstest]
556 fn test_async_trading_command_sender_creation() {
557 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
558 let sender = AsyncTradingCommandSender::new(tx);
559 assert!(format!("{sender:?}").contains("AsyncTradingCommandSender"));
560 }
561
562 #[tokio::test]
563 async fn test_execution_event_order_channel() {
564 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
565
566 let event = OrderSubmitted::new(
567 TraderId::from("TRADER-001"),
568 StrategyId::from("S-001"),
569 InstrumentId::from("EUR/USD.SIM"),
570 ClientOrderId::from("O-001"),
571 AccountId::from("SIM-001"),
572 UUID4::new(),
573 UnixNanos::from(1),
574 UnixNanos::from(2),
575 );
576
577 tx.send(ExecutionEvent::Order(OrderEventAny::Submitted(event)))
578 .unwrap();
579
580 let received = rx.recv().await.unwrap();
581 match received {
582 ExecutionEvent::Order(OrderEventAny::Submitted(e)) => {
583 assert_eq!(e.client_order_id(), ClientOrderId::from("O-001"));
584 }
585 _ => panic!("Expected OrderSubmitted event"),
586 }
587 }
588
589 #[tokio::test]
590 async fn test_execution_report_order_status_channel() {
591 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
592
593 let report = OrderStatusReport::new(
594 AccountId::from("SIM-001"),
595 InstrumentId::from("EUR/USD.SIM"),
596 Some(ClientOrderId::from("O-001")),
597 VenueOrderId::from("V-001"),
598 OrderSide::Buy,
599 OrderType::Market,
600 TimeInForce::Gtc,
601 OrderStatus::Accepted,
602 Quantity::from(100_000),
603 Quantity::from(100_000),
604 UnixNanos::from(1),
605 UnixNanos::from(2),
606 UnixNanos::from(3),
607 None,
608 );
609
610 tx.send(ExecutionEvent::Report(ExecutionReport::OrderStatus(
611 Box::new(report),
612 )))
613 .unwrap();
614
615 let received = rx.recv().await.unwrap();
616 match received {
617 ExecutionEvent::Report(ExecutionReport::OrderStatus(r)) => {
618 assert_eq!(r.venue_order_id.as_str(), "V-001");
619 assert_eq!(r.order_status, OrderStatus::Accepted);
620 }
621 _ => panic!("Expected OrderStatusReport"),
622 }
623 }
624
625 #[tokio::test]
626 async fn test_execution_report_fill() {
627 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
628
629 let report = FillReport::new(
630 AccountId::from("SIM-001"),
631 InstrumentId::from("EUR/USD.SIM"),
632 VenueOrderId::from("V-001"),
633 TradeId::from("T-001"),
634 OrderSide::Buy,
635 Quantity::from(100_000),
636 Price::from("1.10000"),
637 Money::from("10 USD"),
638 LiquiditySide::Taker,
639 Some(ClientOrderId::from("O-001")),
640 None,
641 UnixNanos::from(1),
642 UnixNanos::from(2),
643 None,
644 );
645
646 tx.send(ExecutionEvent::Report(ExecutionReport::Fill(Box::new(
647 report,
648 ))))
649 .unwrap();
650
651 let received = rx.recv().await.unwrap();
652 match received {
653 ExecutionEvent::Report(ExecutionReport::Fill(r)) => {
654 assert_eq!(r.venue_order_id.as_str(), "V-001");
655 assert_eq!(r.trade_id.to_string(), "T-001");
656 }
657 _ => panic!("Expected FillReport"),
658 }
659 }
660
661 #[tokio::test]
662 async fn test_execution_report_position() {
663 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
664
665 let report = PositionStatusReport::new(
666 AccountId::from("SIM-001"),
667 InstrumentId::from("EUR/USD.SIM"),
668 PositionSideSpecified::Long,
669 Quantity::from(100_000),
670 UnixNanos::from(1),
671 UnixNanos::from(2),
672 None,
673 Some(PositionId::from("P-001")),
674 None,
675 );
676
677 tx.send(ExecutionEvent::Report(ExecutionReport::Position(Box::new(
678 report,
679 ))))
680 .unwrap();
681
682 let received = rx.recv().await.unwrap();
683 match received {
684 ExecutionEvent::Report(ExecutionReport::Position(r)) => {
685 assert_eq!(r.venue_position_id.unwrap().as_str(), "P-001");
686 }
687 _ => panic!("Expected PositionStatusReport"),
688 }
689 }
690
691 #[tokio::test]
692 async fn test_execution_event_account() {
693 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
694
695 let account_state = AccountState::new(
696 AccountId::from("SIM-001"),
697 AccountType::Cash,
698 vec![],
699 vec![],
700 true,
701 UUID4::new(),
702 UnixNanos::from(1),
703 UnixNanos::from(2),
704 None,
705 );
706
707 tx.send(ExecutionEvent::Account(account_state)).unwrap();
708
709 let received = rx.recv().await.unwrap();
710 match received {
711 ExecutionEvent::Account(r) => {
712 assert_eq!(r.account_id.as_str(), "SIM-001");
713 }
714 _ => panic!("Expected AccountState"),
715 }
716 }
717
718 #[tokio::test]
719 async fn test_runner_stop_method() {
720 let (_data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
721 let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
722 let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
723 let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
724 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
725 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
726
727 let mut runner = AsyncRunner {
728 data_evt_rx,
729 data_cmd_rx,
730 time_evt_rx,
731 exec_evt_rx,
732 exec_cmd_rx,
733 signal_rx,
734 signal_tx: signal_tx.clone(),
735 };
736
737 let runner_handle = tokio::spawn(async move {
738 runner.run().await;
739 });
740
741 let stopper = AsyncRunner {
743 data_evt_rx: tokio::sync::mpsc::unbounded_channel::<DataEvent>().1,
744 data_cmd_rx: tokio::sync::mpsc::unbounded_channel::<DataCommand>().1,
745 time_evt_rx: tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>().1,
746 exec_evt_rx: tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>().1,
747 exec_cmd_rx: tokio::sync::mpsc::unbounded_channel::<TradingCommand>().1,
748 signal_rx: tokio::sync::mpsc::unbounded_channel::<()>().1,
749 signal_tx,
750 };
751
752 stopper.stop();
753
754 let result = tokio::time::timeout(Duration::from_millis(100), runner_handle).await;
755 assert!(result.is_ok(), "Runner should stop when stop() is called");
756 }
757
758 #[tokio::test]
759 async fn test_all_event_types_integration() {
760 let (data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
761 let (data_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
762 let (time_evt_tx, time_evt_rx) =
763 tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
764 let (exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
765 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
766 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
767
768 let mut runner = AsyncRunner {
769 time_evt_rx,
770 data_evt_rx,
771 data_cmd_rx,
772 exec_evt_rx,
773 exec_cmd_rx,
774 signal_rx,
775 signal_tx: signal_tx.clone(),
776 };
777
778 let runner_handle = tokio::spawn(async move {
779 runner.run().await;
780 });
781
782 let quote = test_quote();
784 data_evt_tx
785 .send(DataEvent::Data(Data::Quote(quote)))
786 .unwrap();
787
788 let command = DataCommand::Subscribe(SubscribeCommand::Data(SubscribeCustomData {
790 client_id: Some(ClientId::from("TEST")),
791 venue: None,
792 data_type: nautilus_model::data::DataType::new("QuoteTick", None),
793 command_id: UUID4::new(),
794 ts_init: UnixNanos::default(),
795 params: None,
796 }));
797 data_cmd_tx.send(command).unwrap();
798
799 let event = TimeEvent::new(
801 Ustr::from("test"),
802 UUID4::new(),
803 UnixNanos::from(1),
804 UnixNanos::from(2),
805 );
806 let callback = TimeEventCallback::from(|_: TimeEvent| {});
807 let handler = TimeEventHandlerV2::new(event, callback);
808 time_evt_tx.send(handler).unwrap();
809
810 let order_event = OrderSubmitted::new(
812 TraderId::from("TRADER-001"),
813 StrategyId::from("S-001"),
814 InstrumentId::from("EUR/USD.SIM"),
815 ClientOrderId::from("O-001"),
816 AccountId::from("SIM-001"),
817 UUID4::new(),
818 UnixNanos::from(1),
819 UnixNanos::from(2),
820 );
821 exec_evt_tx
822 .send(ExecutionEvent::Order(
823 nautilus_model::events::OrderEventAny::Submitted(order_event),
824 ))
825 .unwrap();
826
827 let order_status = OrderStatusReport::new(
829 AccountId::from("SIM-001"),
830 InstrumentId::from("EUR/USD.SIM"),
831 Some(ClientOrderId::from("O-001")),
832 VenueOrderId::from("V-001"),
833 OrderSide::Buy,
834 OrderType::Market,
835 TimeInForce::Gtc,
836 OrderStatus::Accepted,
837 Quantity::from(100_000),
838 Quantity::from(100_000),
839 UnixNanos::from(1),
840 UnixNanos::from(2),
841 UnixNanos::from(3),
842 None,
843 );
844 exec_evt_tx
845 .send(ExecutionEvent::Report(ExecutionReport::OrderStatus(
846 Box::new(order_status),
847 )))
848 .unwrap();
849
850 let fill = FillReport::new(
852 AccountId::from("SIM-001"),
853 InstrumentId::from("EUR/USD.SIM"),
854 VenueOrderId::from("V-001"),
855 TradeId::from("T-001"),
856 OrderSide::Buy,
857 Quantity::from(100_000),
858 Price::from("1.10000"),
859 Money::from("10 USD"),
860 LiquiditySide::Taker,
861 Some(ClientOrderId::from("O-001")),
862 None,
863 UnixNanos::from(1),
864 UnixNanos::from(2),
865 None,
866 );
867 exec_evt_tx
868 .send(ExecutionEvent::Report(ExecutionReport::Fill(Box::new(
869 fill,
870 ))))
871 .unwrap();
872
873 let position = PositionStatusReport::new(
875 AccountId::from("SIM-001"),
876 InstrumentId::from("EUR/USD.SIM"),
877 PositionSideSpecified::Long,
878 Quantity::from(100_000),
879 UnixNanos::from(1),
880 UnixNanos::from(2),
881 None,
882 Some(PositionId::from("P-001")),
883 None,
884 );
885 exec_evt_tx
886 .send(ExecutionEvent::Report(ExecutionReport::Position(Box::new(
887 position,
888 ))))
889 .unwrap();
890
891 let account_state = AccountState::new(
893 AccountId::from("SIM-001"),
894 AccountType::Cash,
895 vec![],
896 vec![],
897 true,
898 UUID4::new(),
899 UnixNanos::from(1),
900 UnixNanos::from(2),
901 None,
902 );
903 exec_evt_tx
904 .send(ExecutionEvent::Account(account_state))
905 .unwrap();
906
907 tokio::time::sleep(Duration::from_millis(100)).await;
909
910 signal_tx.send(()).unwrap();
912
913 let result = tokio::time::timeout(Duration::from_secs(1), runner_handle).await;
914 assert!(
915 result.is_ok(),
916 "Runner should process all event types and stop cleanly"
917 );
918 }
919}