1use std::{fmt::Debug, sync::Arc};
17
18use nautilus_common::{
19 live::runner::{set_data_event_sender, set_exec_event_sender},
20 messages::{
21 DataEvent, ExecutionEvent, ExecutionReport, data::DataCommand, execution::TradingCommand,
22 },
23 msgbus::{self, MessagingSwitchboard},
24 runner::{
25 DataCommandSender, TimeEventSender, TradingCommandSender, set_data_cmd_sender,
26 set_exec_cmd_sender, set_time_event_sender,
27 },
28 timer::TimeEventHandler,
29};
30
31#[derive(Debug)]
33pub struct AsyncDataCommandSender {
34 cmd_tx: tokio::sync::mpsc::UnboundedSender<DataCommand>,
35}
36
37impl AsyncDataCommandSender {
38 #[must_use]
39 pub const fn new(cmd_tx: tokio::sync::mpsc::UnboundedSender<DataCommand>) -> Self {
40 Self { cmd_tx }
41 }
42}
43
44impl DataCommandSender for AsyncDataCommandSender {
45 fn execute(&self, command: DataCommand) {
46 if let Err(e) = self.cmd_tx.send(command) {
47 log::error!("Failed to send data command: {e}");
48 }
49 }
50}
51
52#[derive(Debug, Clone)]
54pub struct AsyncTimeEventSender {
55 time_tx: tokio::sync::mpsc::UnboundedSender<TimeEventHandler>,
56}
57
58impl AsyncTimeEventSender {
59 #[must_use]
60 pub const fn new(time_tx: tokio::sync::mpsc::UnboundedSender<TimeEventHandler>) -> Self {
61 Self { time_tx }
62 }
63
64 #[must_use]
69 pub fn get_channel_sender(&self) -> tokio::sync::mpsc::UnboundedSender<TimeEventHandler> {
70 self.time_tx.clone()
71 }
72}
73
74impl TimeEventSender for AsyncTimeEventSender {
75 fn send(&self, handler: TimeEventHandler) {
76 if let Err(e) = self.time_tx.send(handler) {
77 log::error!("Failed to send time event handler: {e}");
78 }
79 }
80}
81
82#[derive(Debug)]
84pub struct AsyncTradingCommandSender {
85 cmd_tx: tokio::sync::mpsc::UnboundedSender<TradingCommand>,
86}
87
88impl AsyncTradingCommandSender {
89 #[must_use]
90 pub const fn new(cmd_tx: tokio::sync::mpsc::UnboundedSender<TradingCommand>) -> Self {
91 Self { cmd_tx }
92 }
93}
94
95impl TradingCommandSender for AsyncTradingCommandSender {
96 fn execute(&self, command: TradingCommand) {
97 if let Err(e) = self.cmd_tx.send(command) {
98 log::error!("Failed to send trading command: {e}");
99 }
100 }
101}
102
103pub trait Runner {
104 fn run(&mut self);
105}
106
107#[derive(Debug)]
112pub struct AsyncRunnerChannels {
113 pub time_evt_rx: tokio::sync::mpsc::UnboundedReceiver<TimeEventHandler>,
114 pub data_evt_rx: tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
115 pub data_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
116 pub exec_evt_rx: tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
117 pub exec_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<TradingCommand>,
118}
119
120pub struct AsyncRunner {
121 channels: AsyncRunnerChannels,
122 signal_rx: tokio::sync::mpsc::UnboundedReceiver<()>,
123 signal_tx: tokio::sync::mpsc::UnboundedSender<()>,
124}
125
126#[derive(Clone, Debug)]
128pub struct AsyncRunnerHandle {
129 signal_tx: tokio::sync::mpsc::UnboundedSender<()>,
130}
131
132impl AsyncRunnerHandle {
133 pub fn stop(&self) {
135 if let Err(e) = self.signal_tx.send(()) {
136 log::error!("Failed to send shutdown signal: {e}");
137 }
138 }
139}
140
141impl Default for AsyncRunner {
142 fn default() -> Self {
143 Self::new()
144 }
145}
146
147impl Debug for AsyncRunner {
148 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149 f.debug_struct(stringify!(AsyncRunner)).finish()
150 }
151}
152
153impl AsyncRunner {
154 #[must_use]
156 pub fn new() -> Self {
157 use tokio::sync::mpsc::unbounded_channel; let (time_evt_tx, time_evt_rx) = unbounded_channel::<TimeEventHandler>();
160 let (data_cmd_tx, data_cmd_rx) = unbounded_channel::<DataCommand>();
161 let (data_evt_tx, data_evt_rx) = unbounded_channel::<DataEvent>();
162 let (exec_cmd_tx, exec_cmd_rx) = unbounded_channel::<TradingCommand>();
163 let (exec_evt_tx, exec_evt_rx) = unbounded_channel::<ExecutionEvent>();
164 let (signal_tx, signal_rx) = unbounded_channel::<()>();
165
166 set_time_event_sender(Arc::new(AsyncTimeEventSender::new(time_evt_tx)));
167 set_data_cmd_sender(Arc::new(AsyncDataCommandSender::new(data_cmd_tx)));
168 set_data_event_sender(data_evt_tx);
169 set_exec_cmd_sender(Arc::new(AsyncTradingCommandSender::new(exec_cmd_tx)));
170 set_exec_event_sender(exec_evt_tx);
171
172 Self {
173 channels: AsyncRunnerChannels {
174 time_evt_rx,
175 data_evt_rx,
176 data_cmd_rx,
177 exec_evt_rx,
178 exec_cmd_rx,
179 },
180 signal_rx,
181 signal_tx,
182 }
183 }
184
185 pub fn stop(&self) {
187 if let Err(e) = self.signal_tx.send(()) {
188 log::error!("Failed to send shutdown signal: {e}");
189 }
190 }
191
192 #[must_use]
194 pub fn handle(&self) -> AsyncRunnerHandle {
195 AsyncRunnerHandle {
196 signal_tx: self.signal_tx.clone(),
197 }
198 }
199
200 #[must_use]
205 pub fn take_channels(self) -> AsyncRunnerChannels {
206 self.channels
207 }
208
209 pub fn drain_pending_data_events(&mut self) {
211 let mut count = 0;
212 while let Ok(evt) = self.channels.data_evt_rx.try_recv() {
213 Self::handle_data_event(evt);
214 count += 1;
215 }
216 if count > 0 {
217 log::debug!("Drained {count} pending data events");
218 }
219 }
220
221 pub async fn run(&mut self) {
226 log::info!("AsyncRunner starting");
227
228 loop {
229 tokio::select! {
230 Some(()) = self.signal_rx.recv() => {
231 log::info!("AsyncRunner received signal, shutting down");
232 return;
233 },
234 Some(handler) = self.channels.time_evt_rx.recv() => {
235 Self::handle_time_event(handler);
236 },
237 Some(cmd) = self.channels.data_cmd_rx.recv() => {
238 Self::handle_data_command(cmd);
239 },
240 Some(evt) = self.channels.data_evt_rx.recv() => {
241 Self::handle_data_event(evt);
242 },
243 Some(cmd) = self.channels.exec_cmd_rx.recv() => {
244 Self::handle_exec_command(cmd);
245 },
246 Some(evt) = self.channels.exec_evt_rx.recv() => {
247 Self::handle_exec_event(evt);
248 },
249 else => {
250 log::debug!("AsyncRunner all channels closed, exiting");
251 return;
252 }
253 };
254 }
255 }
256
257 #[inline]
259 pub fn handle_time_event(handler: TimeEventHandler) {
260 handler.run();
261 }
262
263 #[inline]
265 pub fn handle_data_command(cmd: DataCommand) {
266 msgbus::send_data_command(MessagingSwitchboard::data_engine_execute(), cmd);
267 }
268
269 #[inline]
271 pub fn handle_data_event(event: DataEvent) {
272 match event {
273 DataEvent::Data(data) => {
274 msgbus::send_data(MessagingSwitchboard::data_engine_process_data(), data);
275 }
276 DataEvent::Instrument(data) => {
277 msgbus::send_any(MessagingSwitchboard::data_engine_process(), &data);
278 }
279 DataEvent::Response(resp) => {
280 msgbus::send_data_response(MessagingSwitchboard::data_engine_response(), resp);
281 }
282 DataEvent::FundingRate(funding_rate) => {
283 msgbus::send_any(MessagingSwitchboard::data_engine_process(), &funding_rate);
284 }
285 #[cfg(feature = "defi")]
286 DataEvent::DeFi(data) => {
287 msgbus::send_defi_data(MessagingSwitchboard::data_engine_process_defi_data(), data);
288 }
289 }
290 }
291
292 #[inline]
294 pub fn handle_exec_command(cmd: TradingCommand) {
295 msgbus::send_trading_command(MessagingSwitchboard::exec_engine_execute(), cmd);
296 }
297
298 #[inline]
300 pub fn handle_exec_event(event: ExecutionEvent) {
301 match event {
302 ExecutionEvent::Order(order_event) => {
303 msgbus::send_order_event(MessagingSwitchboard::exec_engine_process(), order_event);
304 }
305 ExecutionEvent::Report(report) => {
306 Self::handle_exec_report(report);
307 }
308 ExecutionEvent::Account(ref account) => {
309 msgbus::send_account_state(
310 MessagingSwitchboard::portfolio_update_account(),
311 account,
312 );
313 }
314 }
315 }
316
317 #[inline]
318 pub fn handle_exec_report(report: ExecutionReport) {
319 let endpoint = MessagingSwitchboard::exec_engine_reconcile_execution_report();
320 msgbus::send_execution_report(endpoint, report);
321 }
322}
323
324#[cfg(test)]
325mod tests {
326 use std::time::Duration;
327
328 use nautilus_common::{
329 messages::{
330 ExecutionEvent, ExecutionReport,
331 data::{SubscribeCommand, SubscribeCustomData},
332 execution::{CancelAllOrders, TradingCommand},
333 },
334 timer::{TimeEvent, TimeEventCallback, TimeEventHandler},
335 };
336 use nautilus_core::{UUID4, UnixNanos};
337 use nautilus_model::{
338 data::{Data, DataType, quote::QuoteTick},
339 enums::{
340 AccountType, LiquiditySide, OrderSide, OrderStatus, OrderType, PositionSideSpecified,
341 TimeInForce,
342 },
343 events::{OrderEvent, OrderEventAny, OrderSubmitted, account::state::AccountState},
344 identifiers::{
345 AccountId, ClientId, ClientOrderId, InstrumentId, PositionId, StrategyId, TradeId,
346 TraderId, VenueOrderId,
347 },
348 reports::{FillReport, OrderStatusReport, PositionStatusReport},
349 types::{Money, Price, Quantity},
350 };
351 use rstest::rstest;
352 use ustr::Ustr;
353
354 use super::*;
355
356 fn test_quote() -> QuoteTick {
358 QuoteTick {
359 instrument_id: InstrumentId::from("EUR/USD.SIM"),
360 bid_price: Price::from("1.10000"),
361 ask_price: Price::from("1.10001"),
362 bid_size: Quantity::from(1_000_000),
363 ask_size: Quantity::from(1_000_000),
364 ts_event: UnixNanos::default(),
365 ts_init: UnixNanos::default(),
366 }
367 }
368
369 fn create_test_runner(
371 time_evt_rx: tokio::sync::mpsc::UnboundedReceiver<TimeEventHandler>,
372 data_evt_rx: tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
373 data_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
374 exec_evt_rx: tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
375 exec_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<TradingCommand>,
376 signal_rx: tokio::sync::mpsc::UnboundedReceiver<()>,
377 signal_tx: tokio::sync::mpsc::UnboundedSender<()>,
378 ) -> AsyncRunner {
379 AsyncRunner {
380 channels: AsyncRunnerChannels {
381 time_evt_rx,
382 data_evt_rx,
383 data_cmd_rx,
384 exec_evt_rx,
385 exec_cmd_rx,
386 },
387 signal_rx,
388 signal_tx,
389 }
390 }
391
392 #[rstest]
393 fn test_async_data_command_sender_creation() {
394 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
395 let sender = AsyncDataCommandSender::new(tx);
396 assert!(format!("{sender:?}").contains("AsyncDataCommandSender"));
397 }
398
399 #[rstest]
400 fn test_async_time_event_sender_creation() {
401 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
402 let sender = AsyncTimeEventSender::new(tx);
403 assert!(format!("{sender:?}").contains("AsyncTimeEventSender"));
404 }
405
406 #[rstest]
407 fn test_async_time_event_sender_get_channel() {
408 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
409 let sender = AsyncTimeEventSender::new(tx);
410 let channel = sender.get_channel_sender();
411
412 let event = TimeEvent::new(
414 Ustr::from("test"),
415 UUID4::new(),
416 UnixNanos::from(1),
417 UnixNanos::from(2),
418 );
419 let callback = TimeEventCallback::from(|_: TimeEvent| {});
420 let handler = TimeEventHandler::new(event, callback);
421
422 assert!(channel.send(handler).is_ok());
423 }
424
425 #[tokio::test]
426 async fn test_async_data_command_sender_execute() {
427 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
428 let sender = AsyncDataCommandSender::new(tx);
429
430 let command = DataCommand::Subscribe(SubscribeCommand::Data(SubscribeCustomData {
431 client_id: Some(ClientId::from("TEST")),
432 venue: None,
433 data_type: DataType::new("QuoteTick", None),
434 command_id: UUID4::new(),
435 ts_init: UnixNanos::default(),
436 correlation_id: None,
437 params: None,
438 }));
439
440 sender.execute(command.clone());
441
442 let received = rx.recv().await.unwrap();
443 match (received, command) {
444 (
445 DataCommand::Subscribe(SubscribeCommand::Data(r)),
446 DataCommand::Subscribe(SubscribeCommand::Data(c)),
447 ) => {
448 assert_eq!(r.client_id, c.client_id);
449 assert_eq!(r.data_type, c.data_type);
450 }
451 _ => panic!("Command mismatch"),
452 }
453 }
454
455 #[tokio::test]
456 async fn test_async_time_event_sender_send() {
457 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
458 let sender = AsyncTimeEventSender::new(tx);
459
460 let event = TimeEvent::new(
461 Ustr::from("test"),
462 UUID4::new(),
463 UnixNanos::from(1),
464 UnixNanos::from(2),
465 );
466 let callback = TimeEventCallback::from(|_: TimeEvent| {});
467 let handler = TimeEventHandler::new(event, callback);
468
469 sender.send(handler);
470
471 assert!(rx.recv().await.is_some());
472 }
473
474 #[tokio::test]
475 async fn test_runner_shutdown_signal() {
476 let (_data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
478 let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
479 let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
480 let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
481 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
482 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
483
484 let mut runner = create_test_runner(
485 time_evt_rx,
486 data_evt_rx,
487 data_cmd_rx,
488 exec_evt_rx,
489 exec_cmd_rx,
490 signal_rx,
491 signal_tx.clone(),
492 );
493
494 let runner_handle = tokio::spawn(async move {
496 runner.run().await;
497 });
498
499 signal_tx.send(()).unwrap();
501
502 let result = tokio::time::timeout(Duration::from_millis(100), runner_handle).await;
504 assert!(result.is_ok(), "Runner should stop on signal");
505 }
506
507 #[tokio::test]
508 async fn test_runner_closes_on_channel_drop() {
509 let (data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
510 let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
511 let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
512 let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
513 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
514 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
515
516 let mut runner = create_test_runner(
517 time_evt_rx,
518 data_evt_rx,
519 data_cmd_rx,
520 exec_evt_rx,
521 exec_cmd_rx,
522 signal_rx,
523 signal_tx.clone(),
524 );
525
526 let runner_handle = tokio::spawn(async move {
528 runner.run().await;
529 });
530
531 drop(data_tx);
532
533 tokio::task::yield_now().await;
535 signal_tx.send(()).ok();
536
537 let result = tokio::time::timeout(Duration::from_millis(200), runner_handle).await;
539 assert!(
540 result.is_ok(),
541 "Runner should stop when channels close or on signal"
542 );
543 }
544
545 #[tokio::test]
546 async fn test_concurrent_event_sending() {
547 let (data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
548 let (_data_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
549 let (_time_evt_tx, time_evt_rx) =
550 tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
551 let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
552 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
553 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
554
555 let mut runner = create_test_runner(
557 time_evt_rx,
558 data_evt_rx,
559 data_cmd_rx,
560 exec_evt_rx,
561 exec_cmd_rx,
562 signal_rx,
563 signal_tx.clone(),
564 );
565
566 let mut handles = vec![];
568 for _ in 0..5 {
569 let tx_clone = data_evt_tx.clone();
570 let handle = tokio::spawn(async move {
571 for _ in 0..20 {
572 let quote = test_quote();
573 tx_clone.send(DataEvent::Data(Data::Quote(quote))).unwrap();
574 tokio::task::yield_now().await;
575 }
576 });
577 handles.push(handle);
578 }
579
580 let runner_handle = tokio::spawn(async move {
582 runner.run().await;
583 });
584
585 for handle in handles {
587 handle.await.unwrap();
588 }
589
590 tokio::task::yield_now().await;
592 signal_tx.send(()).unwrap();
593
594 let _ = tokio::time::timeout(Duration::from_millis(200), runner_handle).await;
595 }
596
597 #[rstest]
598 #[case(10)]
599 #[case(100)]
600 #[case(1000)]
601 fn test_channel_send_performance(#[case] count: usize) {
602 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
603 let quote = test_quote();
604
605 for _ in 0..count {
607 tx.send(DataEvent::Data(Data::Quote(quote))).unwrap();
608 }
609
610 let mut received = 0;
612 while rx.try_recv().is_ok() {
613 received += 1;
614 }
615
616 assert_eq!(received, count);
617 }
618
619 #[rstest]
620 fn test_async_trading_command_sender_creation() {
621 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
622 let sender = AsyncTradingCommandSender::new(tx);
623 assert!(format!("{sender:?}").contains("AsyncTradingCommandSender"));
624 }
625
626 #[tokio::test]
627 async fn test_async_trading_command_sender_execute() {
628 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
629 let sender = AsyncTradingCommandSender::new(tx);
630
631 let command = TradingCommand::CancelAllOrders(CancelAllOrders::new(
632 TraderId::from("TRADER-001"),
633 None,
634 StrategyId::from("S-001"),
635 InstrumentId::from("EUR/USD.SIM"),
636 OrderSide::Buy,
637 UUID4::new(),
638 UnixNanos::default(),
639 None,
640 ));
641
642 sender.execute(command);
643
644 let received = rx.recv().await;
645 assert!(received.is_some());
646 assert!(matches!(
647 received.unwrap(),
648 TradingCommand::CancelAllOrders(_)
649 ));
650 }
651
652 #[tokio::test]
653 async fn test_runner_processes_trading_commands() {
654 let (_data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
655 let (_data_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
656 let (_time_evt_tx, time_evt_rx) =
657 tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
658 let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
659 let (exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
660 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
661
662 let mut runner = create_test_runner(
663 time_evt_rx,
664 data_evt_rx,
665 data_cmd_rx,
666 exec_evt_rx,
667 exec_cmd_rx,
668 signal_rx,
669 signal_tx.clone(),
670 );
671
672 let runner_handle = tokio::spawn(async move {
673 runner.run().await;
674 });
675
676 let command = TradingCommand::CancelAllOrders(CancelAllOrders::new(
677 TraderId::from("TRADER-001"),
678 None,
679 StrategyId::from("S-001"),
680 InstrumentId::from("EUR/USD.SIM"),
681 OrderSide::Buy,
682 UUID4::new(),
683 UnixNanos::default(),
684 None,
685 ));
686 exec_cmd_tx.send(command).unwrap();
687
688 tokio::task::yield_now().await;
689 signal_tx.send(()).unwrap();
690
691 let result = tokio::time::timeout(Duration::from_millis(100), runner_handle).await;
692 assert!(result.is_ok(), "Runner should process command and stop");
693 }
694
695 #[tokio::test]
696 async fn test_runner_processes_multiple_trading_commands() {
697 let (_data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
698 let (_data_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
699 let (_time_evt_tx, time_evt_rx) =
700 tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
701 let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
702 let (exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
703 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
704
705 let mut runner = create_test_runner(
706 time_evt_rx,
707 data_evt_rx,
708 data_cmd_rx,
709 exec_evt_rx,
710 exec_cmd_rx,
711 signal_rx,
712 signal_tx.clone(),
713 );
714
715 let runner_handle = tokio::spawn(async move {
716 runner.run().await;
717 });
718
719 for i in 0..10 {
720 let strategy_id = format!("S-{i:03}");
721 let command = TradingCommand::CancelAllOrders(CancelAllOrders::new(
722 TraderId::from("TRADER-001"),
723 None,
724 StrategyId::from(strategy_id.as_str()),
725 InstrumentId::from("EUR/USD.SIM"),
726 OrderSide::Buy,
727 UUID4::new(),
728 UnixNanos::default(),
729 None,
730 ));
731 exec_cmd_tx.send(command).unwrap();
732 }
733
734 tokio::task::yield_now().await;
735 signal_tx.send(()).unwrap();
736
737 let result = tokio::time::timeout(Duration::from_millis(100), runner_handle).await;
738 assert!(
739 result.is_ok(),
740 "Runner should process all commands and stop"
741 );
742 }
743
744 #[tokio::test]
745 async fn test_execution_event_order_channel() {
746 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
747
748 let event = OrderSubmitted::new(
749 TraderId::from("TRADER-001"),
750 StrategyId::from("S-001"),
751 InstrumentId::from("EUR/USD.SIM"),
752 ClientOrderId::from("O-001"),
753 AccountId::from("SIM-001"),
754 UUID4::new(),
755 UnixNanos::from(1),
756 UnixNanos::from(2),
757 );
758
759 tx.send(ExecutionEvent::Order(OrderEventAny::Submitted(event)))
760 .unwrap();
761
762 let received = rx.recv().await.unwrap();
763 match received {
764 ExecutionEvent::Order(OrderEventAny::Submitted(e)) => {
765 assert_eq!(e.client_order_id(), ClientOrderId::from("O-001"));
766 }
767 _ => panic!("Expected OrderSubmitted event"),
768 }
769 }
770
771 #[tokio::test]
772 async fn test_execution_report_order_status_channel() {
773 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
774
775 let report = OrderStatusReport::new(
776 AccountId::from("SIM-001"),
777 InstrumentId::from("EUR/USD.SIM"),
778 Some(ClientOrderId::from("O-001")),
779 VenueOrderId::from("V-001"),
780 OrderSide::Buy,
781 OrderType::Market,
782 TimeInForce::Gtc,
783 OrderStatus::Accepted,
784 Quantity::from(100_000),
785 Quantity::from(100_000),
786 UnixNanos::from(1),
787 UnixNanos::from(2),
788 UnixNanos::from(3),
789 None,
790 );
791
792 tx.send(ExecutionEvent::Report(ExecutionReport::Order(Box::new(
793 report,
794 ))))
795 .unwrap();
796
797 let received = rx.recv().await.unwrap();
798 match received {
799 ExecutionEvent::Report(ExecutionReport::Order(r)) => {
800 assert_eq!(r.venue_order_id.as_str(), "V-001");
801 assert_eq!(r.order_status, OrderStatus::Accepted);
802 }
803 _ => panic!("Expected OrderStatusReport"),
804 }
805 }
806
807 #[tokio::test]
808 async fn test_execution_report_fill() {
809 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
810
811 let report = FillReport::new(
812 AccountId::from("SIM-001"),
813 InstrumentId::from("EUR/USD.SIM"),
814 VenueOrderId::from("V-001"),
815 TradeId::from("T-001"),
816 OrderSide::Buy,
817 Quantity::from(100_000),
818 Price::from("1.10000"),
819 Money::from("10 USD"),
820 LiquiditySide::Taker,
821 Some(ClientOrderId::from("O-001")),
822 None,
823 UnixNanos::from(1),
824 UnixNanos::from(2),
825 None,
826 );
827
828 tx.send(ExecutionEvent::Report(ExecutionReport::Fill(Box::new(
829 report,
830 ))))
831 .unwrap();
832
833 let received = rx.recv().await.unwrap();
834 match received {
835 ExecutionEvent::Report(ExecutionReport::Fill(r)) => {
836 assert_eq!(r.venue_order_id.as_str(), "V-001");
837 assert_eq!(r.trade_id.to_string(), "T-001");
838 }
839 _ => panic!("Expected FillReport"),
840 }
841 }
842
843 #[tokio::test]
844 async fn test_execution_report_position() {
845 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
846
847 let report = PositionStatusReport::new(
848 AccountId::from("SIM-001"),
849 InstrumentId::from("EUR/USD.SIM"),
850 PositionSideSpecified::Long,
851 Quantity::from(100_000),
852 UnixNanos::from(1),
853 UnixNanos::from(2),
854 None,
855 Some(PositionId::from("P-001")),
856 None,
857 );
858
859 tx.send(ExecutionEvent::Report(ExecutionReport::Position(Box::new(
860 report,
861 ))))
862 .unwrap();
863
864 let received = rx.recv().await.unwrap();
865 match received {
866 ExecutionEvent::Report(ExecutionReport::Position(r)) => {
867 assert_eq!(r.venue_position_id.unwrap().as_str(), "P-001");
868 }
869 _ => panic!("Expected PositionStatusReport"),
870 }
871 }
872
873 #[tokio::test]
874 async fn test_execution_event_account() {
875 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
876
877 let account_state = AccountState::new(
878 AccountId::from("SIM-001"),
879 AccountType::Cash,
880 vec![],
881 vec![],
882 true,
883 UUID4::new(),
884 UnixNanos::from(1),
885 UnixNanos::from(2),
886 None,
887 );
888
889 tx.send(ExecutionEvent::Account(account_state)).unwrap();
890
891 let received = rx.recv().await.unwrap();
892 match received {
893 ExecutionEvent::Account(r) => {
894 assert_eq!(r.account_id.as_str(), "SIM-001");
895 }
896 _ => panic!("Expected AccountState"),
897 }
898 }
899
900 #[tokio::test]
901 async fn test_runner_stop_method() {
902 let (_data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
903 let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
904 let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
905 let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
906 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
907 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
908
909 let mut runner = create_test_runner(
910 time_evt_rx,
911 data_evt_rx,
912 data_cmd_rx,
913 exec_evt_rx,
914 exec_cmd_rx,
915 signal_rx,
916 signal_tx.clone(),
917 );
918
919 let runner_handle = tokio::spawn(async move {
920 runner.run().await;
921 });
922
923 signal_tx.send(()).unwrap();
925
926 let result = tokio::time::timeout(Duration::from_millis(100), runner_handle).await;
927 assert!(result.is_ok(), "Runner should stop when stop() is called");
928 }
929
930 #[tokio::test]
931 async fn test_all_event_types_integration() {
932 let (data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
933 let (data_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
934 let (time_evt_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
935 let (exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
936 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
937 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
938
939 let mut runner = create_test_runner(
940 time_evt_rx,
941 data_evt_rx,
942 data_cmd_rx,
943 exec_evt_rx,
944 exec_cmd_rx,
945 signal_rx,
946 signal_tx.clone(),
947 );
948
949 let runner_handle = tokio::spawn(async move {
950 runner.run().await;
951 });
952
953 let quote = test_quote();
955 data_evt_tx
956 .send(DataEvent::Data(Data::Quote(quote)))
957 .unwrap();
958
959 let command = DataCommand::Subscribe(SubscribeCommand::Data(SubscribeCustomData {
961 client_id: Some(ClientId::from("TEST")),
962 venue: None,
963 data_type: DataType::new("QuoteTick", None),
964 command_id: UUID4::new(),
965 ts_init: UnixNanos::default(),
966 correlation_id: None,
967 params: None,
968 }));
969 data_cmd_tx.send(command).unwrap();
970
971 let event = TimeEvent::new(
973 Ustr::from("test"),
974 UUID4::new(),
975 UnixNanos::from(1),
976 UnixNanos::from(2),
977 );
978 let callback = TimeEventCallback::from(|_: TimeEvent| {});
979 let handler = TimeEventHandler::new(event, callback);
980 time_evt_tx.send(handler).unwrap();
981
982 let order_event = OrderSubmitted::new(
984 TraderId::from("TRADER-001"),
985 StrategyId::from("S-001"),
986 InstrumentId::from("EUR/USD.SIM"),
987 ClientOrderId::from("O-001"),
988 AccountId::from("SIM-001"),
989 UUID4::new(),
990 UnixNanos::from(1),
991 UnixNanos::from(2),
992 );
993 exec_evt_tx
994 .send(ExecutionEvent::Order(OrderEventAny::Submitted(order_event)))
995 .unwrap();
996
997 let order_status = OrderStatusReport::new(
999 AccountId::from("SIM-001"),
1000 InstrumentId::from("EUR/USD.SIM"),
1001 Some(ClientOrderId::from("O-001")),
1002 VenueOrderId::from("V-001"),
1003 OrderSide::Buy,
1004 OrderType::Market,
1005 TimeInForce::Gtc,
1006 OrderStatus::Accepted,
1007 Quantity::from(100_000),
1008 Quantity::from(100_000),
1009 UnixNanos::from(1),
1010 UnixNanos::from(2),
1011 UnixNanos::from(3),
1012 None,
1013 );
1014 exec_evt_tx
1015 .send(ExecutionEvent::Report(ExecutionReport::Order(Box::new(
1016 order_status,
1017 ))))
1018 .unwrap();
1019
1020 let fill = FillReport::new(
1022 AccountId::from("SIM-001"),
1023 InstrumentId::from("EUR/USD.SIM"),
1024 VenueOrderId::from("V-001"),
1025 TradeId::from("T-001"),
1026 OrderSide::Buy,
1027 Quantity::from(100_000),
1028 Price::from("1.10000"),
1029 Money::from("10 USD"),
1030 LiquiditySide::Taker,
1031 Some(ClientOrderId::from("O-001")),
1032 None,
1033 UnixNanos::from(1),
1034 UnixNanos::from(2),
1035 None,
1036 );
1037 exec_evt_tx
1038 .send(ExecutionEvent::Report(ExecutionReport::Fill(Box::new(
1039 fill,
1040 ))))
1041 .unwrap();
1042
1043 let position = PositionStatusReport::new(
1045 AccountId::from("SIM-001"),
1046 InstrumentId::from("EUR/USD.SIM"),
1047 PositionSideSpecified::Long,
1048 Quantity::from(100_000),
1049 UnixNanos::from(1),
1050 UnixNanos::from(2),
1051 None,
1052 Some(PositionId::from("P-001")),
1053 None,
1054 );
1055 exec_evt_tx
1056 .send(ExecutionEvent::Report(ExecutionReport::Position(Box::new(
1057 position,
1058 ))))
1059 .unwrap();
1060
1061 let account_state = AccountState::new(
1063 AccountId::from("SIM-001"),
1064 AccountType::Cash,
1065 vec![],
1066 vec![],
1067 true,
1068 UUID4::new(),
1069 UnixNanos::from(1),
1070 UnixNanos::from(2),
1071 None,
1072 );
1073 exec_evt_tx
1074 .send(ExecutionEvent::Account(account_state))
1075 .unwrap();
1076
1077 tokio::task::yield_now().await;
1079 signal_tx.send(()).unwrap();
1080
1081 let result = tokio::time::timeout(Duration::from_millis(200), runner_handle).await;
1082 assert!(
1083 result.is_ok(),
1084 "Runner should process all event types and stop cleanly"
1085 );
1086 }
1087
1088 #[tokio::test]
1089 async fn test_runner_handle_stops_runner() {
1090 let (_data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
1091 let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
1092 let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
1093 let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
1094 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
1095 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
1096
1097 let mut runner = create_test_runner(
1098 time_evt_rx,
1099 data_evt_rx,
1100 data_cmd_rx,
1101 exec_evt_rx,
1102 exec_cmd_rx,
1103 signal_rx,
1104 signal_tx.clone(),
1105 );
1106
1107 let handle = runner.handle();
1109
1110 let runner_task = tokio::spawn(async move {
1111 runner.run().await;
1112 });
1113
1114 handle.stop();
1116
1117 let result = tokio::time::timeout(Duration::from_millis(100), runner_task).await;
1118 assert!(result.is_ok(), "Runner should stop via handle");
1119 }
1120
1121 #[tokio::test]
1122 async fn test_runner_handle_is_cloneable() {
1123 let (signal_tx, _signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
1124 let handle = AsyncRunnerHandle { signal_tx };
1125
1126 let handle2 = handle.clone();
1127
1128 assert!(handle.signal_tx.send(()).is_ok());
1130 assert!(handle2.signal_tx.send(()).is_ok());
1131 }
1132
1133 #[tokio::test]
1134 async fn test_runner_processes_events_before_stop() {
1135 let (data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
1136 let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
1137 let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
1138 let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
1139 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
1140 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
1141
1142 let mut runner = create_test_runner(
1143 time_evt_rx,
1144 data_evt_rx,
1145 data_cmd_rx,
1146 exec_evt_rx,
1147 exec_cmd_rx,
1148 signal_rx,
1149 signal_tx.clone(),
1150 );
1151
1152 let handle = runner.handle();
1153
1154 for _ in 0..10 {
1156 let quote = test_quote();
1157 data_evt_tx
1158 .send(DataEvent::Data(Data::Quote(quote)))
1159 .unwrap();
1160 }
1161
1162 let runner_task = tokio::spawn(async move {
1163 runner.run().await;
1164 });
1165
1166 tokio::task::yield_now().await;
1168 handle.stop();
1169
1170 let result = tokio::time::timeout(Duration::from_millis(200), runner_task).await;
1171 assert!(result.is_ok(), "Runner should process events and stop");
1172 }
1173}