1use std::{fmt::Debug, sync::Arc};
17
18use nautilus_common::{
19 live::runner::{set_data_event_sender, set_exec_event_sender},
20 messages::{
21 DataEvent, ExecutionEvent, ExecutionReport, data::DataCommand, execution::TradingCommand,
22 },
23 msgbus::{self, switchboard::MessagingSwitchboard},
24 runner::{
25 DataCommandSender, TimeEventSender, TradingCommandSender, set_data_cmd_sender,
26 set_exec_cmd_sender, set_time_event_sender,
27 },
28 timer::TimeEventHandlerV2,
29};
30
31#[derive(Debug)]
33pub struct AsyncDataCommandSender {
34 cmd_tx: tokio::sync::mpsc::UnboundedSender<DataCommand>,
35}
36
37impl AsyncDataCommandSender {
38 #[must_use]
39 pub const fn new(cmd_tx: tokio::sync::mpsc::UnboundedSender<DataCommand>) -> Self {
40 Self { cmd_tx }
41 }
42}
43
44impl DataCommandSender for AsyncDataCommandSender {
45 fn execute(&self, command: DataCommand) {
46 if let Err(e) = self.cmd_tx.send(command) {
47 log::error!("Failed to send data command: {e}");
48 }
49 }
50}
51
52#[derive(Debug, Clone)]
54pub struct AsyncTimeEventSender {
55 time_tx: tokio::sync::mpsc::UnboundedSender<TimeEventHandlerV2>,
56}
57
58impl AsyncTimeEventSender {
59 #[must_use]
60 pub const fn new(time_tx: tokio::sync::mpsc::UnboundedSender<TimeEventHandlerV2>) -> Self {
61 Self { time_tx }
62 }
63
64 #[must_use]
69 pub fn get_channel_sender(&self) -> tokio::sync::mpsc::UnboundedSender<TimeEventHandlerV2> {
70 self.time_tx.clone()
71 }
72}
73
74impl TimeEventSender for AsyncTimeEventSender {
75 fn send(&self, handler: TimeEventHandlerV2) {
76 if let Err(e) = self.time_tx.send(handler) {
77 log::error!("Failed to send time event handler: {e}");
78 }
79 }
80}
81
82#[derive(Debug)]
84pub struct AsyncTradingCommandSender {
85 cmd_tx: tokio::sync::mpsc::UnboundedSender<TradingCommand>,
86}
87
88impl AsyncTradingCommandSender {
89 #[must_use]
90 pub const fn new(cmd_tx: tokio::sync::mpsc::UnboundedSender<TradingCommand>) -> Self {
91 Self { cmd_tx }
92 }
93}
94
95impl TradingCommandSender for AsyncTradingCommandSender {
96 fn execute(&self, command: TradingCommand) {
97 if let Err(e) = self.cmd_tx.send(command) {
98 log::error!("Failed to send trading command: {e}");
99 }
100 }
101}
102
103pub trait Runner {
104 fn run(&mut self);
105}
106
107#[derive(Debug)]
112pub struct AsyncRunnerChannels {
113 pub time_evt_rx: tokio::sync::mpsc::UnboundedReceiver<TimeEventHandlerV2>,
114 pub data_evt_rx: tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
115 pub data_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
116 pub exec_evt_rx: tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
117 pub exec_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<TradingCommand>,
118}
119
120pub struct AsyncRunner {
121 channels: AsyncRunnerChannels,
122 signal_rx: tokio::sync::mpsc::UnboundedReceiver<()>,
123 signal_tx: tokio::sync::mpsc::UnboundedSender<()>,
124}
125
126#[derive(Clone, Debug)]
128pub struct AsyncRunnerHandle {
129 signal_tx: tokio::sync::mpsc::UnboundedSender<()>,
130}
131
132impl AsyncRunnerHandle {
133 pub fn stop(&self) {
135 if let Err(e) = self.signal_tx.send(()) {
136 log::error!("Failed to send shutdown signal: {e}");
137 }
138 }
139}
140
141impl Default for AsyncRunner {
142 fn default() -> Self {
143 Self::new()
144 }
145}
146
147impl Debug for AsyncRunner {
148 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149 f.debug_struct(stringify!(AsyncRunner)).finish()
150 }
151}
152
153impl AsyncRunner {
154 #[must_use]
156 pub fn new() -> Self {
157 use tokio::sync::mpsc::unbounded_channel; let (time_evt_tx, time_evt_rx) = unbounded_channel::<TimeEventHandlerV2>();
160 let (data_cmd_tx, data_cmd_rx) = unbounded_channel::<DataCommand>();
161 let (data_evt_tx, data_evt_rx) = unbounded_channel::<DataEvent>();
162 let (exec_cmd_tx, exec_cmd_rx) = unbounded_channel::<TradingCommand>();
163 let (exec_evt_tx, exec_evt_rx) = unbounded_channel::<ExecutionEvent>();
164 let (signal_tx, signal_rx) = unbounded_channel::<()>();
165
166 set_time_event_sender(Arc::new(AsyncTimeEventSender::new(time_evt_tx)));
167 set_data_cmd_sender(Arc::new(AsyncDataCommandSender::new(data_cmd_tx)));
168 set_data_event_sender(data_evt_tx);
169 set_exec_cmd_sender(Arc::new(AsyncTradingCommandSender::new(exec_cmd_tx)));
170 set_exec_event_sender(exec_evt_tx);
171
172 Self {
173 channels: AsyncRunnerChannels {
174 time_evt_rx,
175 data_evt_rx,
176 data_cmd_rx,
177 exec_evt_rx,
178 exec_cmd_rx,
179 },
180 signal_rx,
181 signal_tx,
182 }
183 }
184
185 pub fn stop(&self) {
187 if let Err(e) = self.signal_tx.send(()) {
188 log::error!("Failed to send shutdown signal: {e}");
189 }
190 }
191
192 #[must_use]
194 pub fn handle(&self) -> AsyncRunnerHandle {
195 AsyncRunnerHandle {
196 signal_tx: self.signal_tx.clone(),
197 }
198 }
199
200 #[must_use]
205 pub fn take_channels(self) -> AsyncRunnerChannels {
206 self.channels
207 }
208}
209
210impl AsyncRunner {
211 pub async fn run(&mut self) {
216 log::info!("AsyncRunner starting");
217
218 loop {
219 tokio::select! {
220 Some(()) = self.signal_rx.recv() => {
221 tracing::info!("AsyncRunner received signal, shutting down");
222 return;
223 },
224 Some(handler) = self.channels.time_evt_rx.recv() => {
225 Self::handle_time_event(handler);
226 },
227 Some(cmd) = self.channels.data_cmd_rx.recv() => {
228 Self::handle_data_command(cmd);
229 },
230 Some(evt) = self.channels.data_evt_rx.recv() => {
231 Self::handle_data_event(evt);
232 },
233 Some(cmd) = self.channels.exec_cmd_rx.recv() => {
234 Self::handle_exec_command(cmd);
235 },
236 Some(evt) = self.channels.exec_evt_rx.recv() => {
237 Self::handle_exec_event(evt);
238 },
239 else => {
240 tracing::debug!("AsyncRunner all channels closed, exiting");
241 return;
242 }
243 };
244 }
245 }
246
247 #[inline]
249 pub fn handle_time_event(handler: TimeEventHandlerV2) {
250 handler.run();
251 }
252
253 #[inline]
255 pub fn handle_data_command(cmd: DataCommand) {
256 msgbus::send_any(MessagingSwitchboard::data_engine_execute(), &cmd);
257 }
258
259 #[inline]
261 pub fn handle_data_event(event: DataEvent) {
262 match event {
263 DataEvent::Data(data) => {
264 msgbus::send_any(MessagingSwitchboard::data_engine_process(), &data);
265 }
266 DataEvent::Instrument(data) => {
267 msgbus::send_any(MessagingSwitchboard::data_engine_process(), &data);
268 }
269 DataEvent::Response(resp) => {
270 msgbus::send_any(MessagingSwitchboard::data_engine_response(), &resp);
271 }
272 #[cfg(feature = "defi")]
273 DataEvent::DeFi(data) => {
274 msgbus::send_any(MessagingSwitchboard::data_engine_process(), &data);
275 }
276 }
277 }
278
279 #[inline]
281 pub fn handle_exec_command(cmd: TradingCommand) {
282 msgbus::send_any(MessagingSwitchboard::exec_engine_execute(), &cmd);
283 }
284
285 #[inline]
287 pub fn handle_exec_event(event: ExecutionEvent) {
288 match event {
289 ExecutionEvent::Order(ref order_event) => {
290 msgbus::send_any(MessagingSwitchboard::exec_engine_process(), order_event);
291 }
292 ExecutionEvent::Report(report) => {
293 Self::handle_exec_report(report);
294 }
295 ExecutionEvent::Account(ref account) => {
296 msgbus::send_any(MessagingSwitchboard::portfolio_update_account(), account);
297 }
298 }
299 }
300
301 #[inline]
302 pub fn handle_exec_report(report: ExecutionReport) {
303 match report {
304 ExecutionReport::OrderStatus(r) => {
305 msgbus::send_any(
306 MessagingSwitchboard::exec_engine_reconcile_execution_report(),
307 &*r,
308 );
309 }
310 ExecutionReport::Fill(r) => {
311 msgbus::send_any(
312 MessagingSwitchboard::exec_engine_reconcile_execution_report(),
313 &*r,
314 );
315 }
316 ExecutionReport::Position(r) => {
317 msgbus::send_any(
318 MessagingSwitchboard::exec_engine_reconcile_execution_report(),
319 &*r,
320 );
321 }
322 ExecutionReport::Mass(r) => {
323 msgbus::send_any(
324 MessagingSwitchboard::exec_engine_reconcile_execution_mass_status(),
325 &*r,
326 );
327 }
328 }
329 }
330}
331
332#[cfg(test)]
333mod tests {
334 use std::time::Duration;
335
336 use nautilus_common::{
337 messages::{
338 ExecutionEvent, ExecutionReport,
339 data::{SubscribeCommand, SubscribeCustomData},
340 execution::TradingCommand,
341 },
342 timer::{TimeEvent, TimeEventCallback, TimeEventHandlerV2},
343 };
344 use nautilus_core::{UUID4, UnixNanos};
345 use nautilus_model::{
346 data::{Data, DataType, quote::QuoteTick},
347 enums::{
348 AccountType, LiquiditySide, OrderSide, OrderStatus, OrderType, PositionSideSpecified,
349 TimeInForce,
350 },
351 events::{OrderEvent, OrderEventAny, OrderSubmitted, account::state::AccountState},
352 identifiers::{
353 AccountId, ClientId, ClientOrderId, InstrumentId, PositionId, StrategyId, TradeId,
354 TraderId, VenueOrderId,
355 },
356 reports::{FillReport, OrderStatusReport, PositionStatusReport},
357 types::{Money, Price, Quantity},
358 };
359 use rstest::rstest;
360 use ustr::Ustr;
361
362 use super::*;
363
364 fn test_quote() -> QuoteTick {
366 QuoteTick {
367 instrument_id: InstrumentId::from("EUR/USD.SIM"),
368 bid_price: Price::from("1.10000"),
369 ask_price: Price::from("1.10001"),
370 bid_size: Quantity::from(1_000_000),
371 ask_size: Quantity::from(1_000_000),
372 ts_event: UnixNanos::default(),
373 ts_init: UnixNanos::default(),
374 }
375 }
376
377 fn create_test_runner(
379 time_evt_rx: tokio::sync::mpsc::UnboundedReceiver<TimeEventHandlerV2>,
380 data_evt_rx: tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
381 data_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
382 exec_evt_rx: tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
383 exec_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<TradingCommand>,
384 signal_rx: tokio::sync::mpsc::UnboundedReceiver<()>,
385 signal_tx: tokio::sync::mpsc::UnboundedSender<()>,
386 ) -> AsyncRunner {
387 AsyncRunner {
388 channels: AsyncRunnerChannels {
389 time_evt_rx,
390 data_evt_rx,
391 data_cmd_rx,
392 exec_evt_rx,
393 exec_cmd_rx,
394 },
395 signal_rx,
396 signal_tx,
397 }
398 }
399
400 #[rstest]
401 fn test_async_data_command_sender_creation() {
402 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
403 let sender = AsyncDataCommandSender::new(tx);
404 assert!(format!("{sender:?}").contains("AsyncDataCommandSender"));
405 }
406
407 #[rstest]
408 fn test_async_time_event_sender_creation() {
409 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
410 let sender = AsyncTimeEventSender::new(tx);
411 assert!(format!("{sender:?}").contains("AsyncTimeEventSender"));
412 }
413
414 #[rstest]
415 fn test_async_time_event_sender_get_channel() {
416 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
417 let sender = AsyncTimeEventSender::new(tx);
418 let channel = sender.get_channel_sender();
419
420 let event = TimeEvent::new(
422 Ustr::from("test"),
423 UUID4::new(),
424 UnixNanos::from(1),
425 UnixNanos::from(2),
426 );
427 let callback = TimeEventCallback::from(|_: TimeEvent| {});
428 let handler = TimeEventHandlerV2::new(event, callback);
429
430 assert!(channel.send(handler).is_ok());
431 }
432
433 #[tokio::test]
434 async fn test_async_data_command_sender_execute() {
435 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
436 let sender = AsyncDataCommandSender::new(tx);
437
438 let command = DataCommand::Subscribe(SubscribeCommand::Data(SubscribeCustomData {
439 client_id: Some(ClientId::from("TEST")),
440 venue: None,
441 data_type: DataType::new("QuoteTick", None),
442 command_id: UUID4::new(),
443 ts_init: UnixNanos::default(),
444 params: None,
445 }));
446
447 sender.execute(command.clone());
448
449 let received = rx.recv().await.unwrap();
450 match (received, command) {
451 (
452 DataCommand::Subscribe(SubscribeCommand::Data(r)),
453 DataCommand::Subscribe(SubscribeCommand::Data(c)),
454 ) => {
455 assert_eq!(r.client_id, c.client_id);
456 assert_eq!(r.data_type, c.data_type);
457 }
458 _ => panic!("Command mismatch"),
459 }
460 }
461
462 #[tokio::test]
463 async fn test_async_time_event_sender_send() {
464 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
465 let sender = AsyncTimeEventSender::new(tx);
466
467 let event = TimeEvent::new(
468 Ustr::from("test"),
469 UUID4::new(),
470 UnixNanos::from(1),
471 UnixNanos::from(2),
472 );
473 let callback = TimeEventCallback::from(|_: TimeEvent| {});
474 let handler = TimeEventHandlerV2::new(event, callback);
475
476 sender.send(handler);
477
478 assert!(rx.recv().await.is_some());
479 }
480
481 #[tokio::test]
482 async fn test_runner_shutdown_signal() {
483 let (_data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
485 let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
486 let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
487 let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
488 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
489 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
490
491 let mut runner = create_test_runner(
492 time_evt_rx,
493 data_evt_rx,
494 data_cmd_rx,
495 exec_evt_rx,
496 exec_cmd_rx,
497 signal_rx,
498 signal_tx.clone(),
499 );
500
501 let runner_handle = tokio::spawn(async move {
503 runner.run().await;
504 });
505
506 signal_tx.send(()).unwrap();
508
509 let result = tokio::time::timeout(Duration::from_millis(100), runner_handle).await;
511 assert!(result.is_ok(), "Runner should stop on signal");
512 }
513
514 #[tokio::test]
515 async fn test_runner_closes_on_channel_drop() {
516 let (data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
517 let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
518 let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
519 let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
520 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
521 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
522
523 let mut runner = create_test_runner(
524 time_evt_rx,
525 data_evt_rx,
526 data_cmd_rx,
527 exec_evt_rx,
528 exec_cmd_rx,
529 signal_rx,
530 signal_tx.clone(),
531 );
532
533 let runner_handle = tokio::spawn(async move {
535 runner.run().await;
536 });
537
538 drop(data_tx);
540
541 tokio::time::sleep(Duration::from_millis(50)).await;
543 signal_tx.send(()).ok();
544
545 let result = tokio::time::timeout(Duration::from_millis(200), runner_handle).await;
547 assert!(
548 result.is_ok(),
549 "Runner should stop when channels close or on signal"
550 );
551 }
552
553 #[tokio::test]
554 async fn test_concurrent_event_sending() {
555 let (data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
556 let (_data_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
557 let (_time_evt_tx, time_evt_rx) =
558 tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
559 let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
560 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
561 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
562
563 let mut runner = create_test_runner(
565 time_evt_rx,
566 data_evt_rx,
567 data_cmd_rx,
568 exec_evt_rx,
569 exec_cmd_rx,
570 signal_rx,
571 signal_tx.clone(),
572 );
573
574 let mut handles = vec![];
576 for _ in 0..5 {
577 let tx_clone = data_evt_tx.clone();
578 let handle = tokio::spawn(async move {
579 for _ in 0..20 {
580 let quote = test_quote();
581 tx_clone.send(DataEvent::Data(Data::Quote(quote))).unwrap();
582 tokio::task::yield_now().await;
583 }
584 });
585 handles.push(handle);
586 }
587
588 let runner_handle = tokio::spawn(async move {
590 runner.run().await;
591 });
592
593 for handle in handles {
595 handle.await.unwrap();
596 }
597
598 tokio::time::sleep(Duration::from_millis(50)).await;
600
601 signal_tx.send(()).unwrap();
603
604 let _ = tokio::time::timeout(Duration::from_secs(1), runner_handle).await;
605 }
606
607 #[rstest]
608 #[case(10)]
609 #[case(100)]
610 #[case(1000)]
611 fn test_channel_send_performance(#[case] count: usize) {
612 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
613 let quote = test_quote();
614
615 for _ in 0..count {
617 tx.send(DataEvent::Data(Data::Quote(quote))).unwrap();
618 }
619
620 let mut received = 0;
622 while rx.try_recv().is_ok() {
623 received += 1;
624 }
625
626 assert_eq!(received, count);
627 }
628
629 #[rstest]
630 fn test_async_trading_command_sender_creation() {
631 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
632 let sender = AsyncTradingCommandSender::new(tx);
633 assert!(format!("{sender:?}").contains("AsyncTradingCommandSender"));
634 }
635
636 #[tokio::test]
637 async fn test_execution_event_order_channel() {
638 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
639
640 let event = OrderSubmitted::new(
641 TraderId::from("TRADER-001"),
642 StrategyId::from("S-001"),
643 InstrumentId::from("EUR/USD.SIM"),
644 ClientOrderId::from("O-001"),
645 AccountId::from("SIM-001"),
646 UUID4::new(),
647 UnixNanos::from(1),
648 UnixNanos::from(2),
649 );
650
651 tx.send(ExecutionEvent::Order(OrderEventAny::Submitted(event)))
652 .unwrap();
653
654 let received = rx.recv().await.unwrap();
655 match received {
656 ExecutionEvent::Order(OrderEventAny::Submitted(e)) => {
657 assert_eq!(e.client_order_id(), ClientOrderId::from("O-001"));
658 }
659 _ => panic!("Expected OrderSubmitted event"),
660 }
661 }
662
663 #[tokio::test]
664 async fn test_execution_report_order_status_channel() {
665 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
666
667 let report = OrderStatusReport::new(
668 AccountId::from("SIM-001"),
669 InstrumentId::from("EUR/USD.SIM"),
670 Some(ClientOrderId::from("O-001")),
671 VenueOrderId::from("V-001"),
672 OrderSide::Buy,
673 OrderType::Market,
674 TimeInForce::Gtc,
675 OrderStatus::Accepted,
676 Quantity::from(100_000),
677 Quantity::from(100_000),
678 UnixNanos::from(1),
679 UnixNanos::from(2),
680 UnixNanos::from(3),
681 None,
682 );
683
684 tx.send(ExecutionEvent::Report(ExecutionReport::OrderStatus(
685 Box::new(report),
686 )))
687 .unwrap();
688
689 let received = rx.recv().await.unwrap();
690 match received {
691 ExecutionEvent::Report(ExecutionReport::OrderStatus(r)) => {
692 assert_eq!(r.venue_order_id.as_str(), "V-001");
693 assert_eq!(r.order_status, OrderStatus::Accepted);
694 }
695 _ => panic!("Expected OrderStatusReport"),
696 }
697 }
698
699 #[tokio::test]
700 async fn test_execution_report_fill() {
701 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
702
703 let report = FillReport::new(
704 AccountId::from("SIM-001"),
705 InstrumentId::from("EUR/USD.SIM"),
706 VenueOrderId::from("V-001"),
707 TradeId::from("T-001"),
708 OrderSide::Buy,
709 Quantity::from(100_000),
710 Price::from("1.10000"),
711 Money::from("10 USD"),
712 LiquiditySide::Taker,
713 Some(ClientOrderId::from("O-001")),
714 None,
715 UnixNanos::from(1),
716 UnixNanos::from(2),
717 None,
718 );
719
720 tx.send(ExecutionEvent::Report(ExecutionReport::Fill(Box::new(
721 report,
722 ))))
723 .unwrap();
724
725 let received = rx.recv().await.unwrap();
726 match received {
727 ExecutionEvent::Report(ExecutionReport::Fill(r)) => {
728 assert_eq!(r.venue_order_id.as_str(), "V-001");
729 assert_eq!(r.trade_id.to_string(), "T-001");
730 }
731 _ => panic!("Expected FillReport"),
732 }
733 }
734
735 #[tokio::test]
736 async fn test_execution_report_position() {
737 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
738
739 let report = PositionStatusReport::new(
740 AccountId::from("SIM-001"),
741 InstrumentId::from("EUR/USD.SIM"),
742 PositionSideSpecified::Long,
743 Quantity::from(100_000),
744 UnixNanos::from(1),
745 UnixNanos::from(2),
746 None,
747 Some(PositionId::from("P-001")),
748 None,
749 );
750
751 tx.send(ExecutionEvent::Report(ExecutionReport::Position(Box::new(
752 report,
753 ))))
754 .unwrap();
755
756 let received = rx.recv().await.unwrap();
757 match received {
758 ExecutionEvent::Report(ExecutionReport::Position(r)) => {
759 assert_eq!(r.venue_position_id.unwrap().as_str(), "P-001");
760 }
761 _ => panic!("Expected PositionStatusReport"),
762 }
763 }
764
765 #[tokio::test]
766 async fn test_execution_event_account() {
767 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
768
769 let account_state = AccountState::new(
770 AccountId::from("SIM-001"),
771 AccountType::Cash,
772 vec![],
773 vec![],
774 true,
775 UUID4::new(),
776 UnixNanos::from(1),
777 UnixNanos::from(2),
778 None,
779 );
780
781 tx.send(ExecutionEvent::Account(account_state)).unwrap();
782
783 let received = rx.recv().await.unwrap();
784 match received {
785 ExecutionEvent::Account(r) => {
786 assert_eq!(r.account_id.as_str(), "SIM-001");
787 }
788 _ => panic!("Expected AccountState"),
789 }
790 }
791
792 #[tokio::test]
793 async fn test_runner_stop_method() {
794 let (_data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
795 let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
796 let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
797 let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
798 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
799 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
800
801 let mut runner = create_test_runner(
802 time_evt_rx,
803 data_evt_rx,
804 data_cmd_rx,
805 exec_evt_rx,
806 exec_cmd_rx,
807 signal_rx,
808 signal_tx.clone(),
809 );
810
811 let runner_handle = tokio::spawn(async move {
812 runner.run().await;
813 });
814
815 signal_tx.send(()).unwrap();
817
818 let result = tokio::time::timeout(Duration::from_millis(100), runner_handle).await;
819 assert!(result.is_ok(), "Runner should stop when stop() is called");
820 }
821
822 #[tokio::test]
823 async fn test_all_event_types_integration() {
824 let (data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
825 let (data_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
826 let (time_evt_tx, time_evt_rx) =
827 tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
828 let (exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
829 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
830 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
831
832 let mut runner = create_test_runner(
833 time_evt_rx,
834 data_evt_rx,
835 data_cmd_rx,
836 exec_evt_rx,
837 exec_cmd_rx,
838 signal_rx,
839 signal_tx.clone(),
840 );
841
842 let runner_handle = tokio::spawn(async move {
843 runner.run().await;
844 });
845
846 let quote = test_quote();
848 data_evt_tx
849 .send(DataEvent::Data(Data::Quote(quote)))
850 .unwrap();
851
852 let command = DataCommand::Subscribe(SubscribeCommand::Data(SubscribeCustomData {
854 client_id: Some(ClientId::from("TEST")),
855 venue: None,
856 data_type: DataType::new("QuoteTick", None),
857 command_id: UUID4::new(),
858 ts_init: UnixNanos::default(),
859 params: None,
860 }));
861 data_cmd_tx.send(command).unwrap();
862
863 let event = TimeEvent::new(
865 Ustr::from("test"),
866 UUID4::new(),
867 UnixNanos::from(1),
868 UnixNanos::from(2),
869 );
870 let callback = TimeEventCallback::from(|_: TimeEvent| {});
871 let handler = TimeEventHandlerV2::new(event, callback);
872 time_evt_tx.send(handler).unwrap();
873
874 let order_event = OrderSubmitted::new(
876 TraderId::from("TRADER-001"),
877 StrategyId::from("S-001"),
878 InstrumentId::from("EUR/USD.SIM"),
879 ClientOrderId::from("O-001"),
880 AccountId::from("SIM-001"),
881 UUID4::new(),
882 UnixNanos::from(1),
883 UnixNanos::from(2),
884 );
885 exec_evt_tx
886 .send(ExecutionEvent::Order(OrderEventAny::Submitted(order_event)))
887 .unwrap();
888
889 let order_status = OrderStatusReport::new(
891 AccountId::from("SIM-001"),
892 InstrumentId::from("EUR/USD.SIM"),
893 Some(ClientOrderId::from("O-001")),
894 VenueOrderId::from("V-001"),
895 OrderSide::Buy,
896 OrderType::Market,
897 TimeInForce::Gtc,
898 OrderStatus::Accepted,
899 Quantity::from(100_000),
900 Quantity::from(100_000),
901 UnixNanos::from(1),
902 UnixNanos::from(2),
903 UnixNanos::from(3),
904 None,
905 );
906 exec_evt_tx
907 .send(ExecutionEvent::Report(ExecutionReport::OrderStatus(
908 Box::new(order_status),
909 )))
910 .unwrap();
911
912 let fill = FillReport::new(
914 AccountId::from("SIM-001"),
915 InstrumentId::from("EUR/USD.SIM"),
916 VenueOrderId::from("V-001"),
917 TradeId::from("T-001"),
918 OrderSide::Buy,
919 Quantity::from(100_000),
920 Price::from("1.10000"),
921 Money::from("10 USD"),
922 LiquiditySide::Taker,
923 Some(ClientOrderId::from("O-001")),
924 None,
925 UnixNanos::from(1),
926 UnixNanos::from(2),
927 None,
928 );
929 exec_evt_tx
930 .send(ExecutionEvent::Report(ExecutionReport::Fill(Box::new(
931 fill,
932 ))))
933 .unwrap();
934
935 let position = PositionStatusReport::new(
937 AccountId::from("SIM-001"),
938 InstrumentId::from("EUR/USD.SIM"),
939 PositionSideSpecified::Long,
940 Quantity::from(100_000),
941 UnixNanos::from(1),
942 UnixNanos::from(2),
943 None,
944 Some(PositionId::from("P-001")),
945 None,
946 );
947 exec_evt_tx
948 .send(ExecutionEvent::Report(ExecutionReport::Position(Box::new(
949 position,
950 ))))
951 .unwrap();
952
953 let account_state = AccountState::new(
955 AccountId::from("SIM-001"),
956 AccountType::Cash,
957 vec![],
958 vec![],
959 true,
960 UUID4::new(),
961 UnixNanos::from(1),
962 UnixNanos::from(2),
963 None,
964 );
965 exec_evt_tx
966 .send(ExecutionEvent::Account(account_state))
967 .unwrap();
968
969 tokio::time::sleep(Duration::from_millis(100)).await;
971
972 signal_tx.send(()).unwrap();
974
975 let result = tokio::time::timeout(Duration::from_secs(1), runner_handle).await;
976 assert!(
977 result.is_ok(),
978 "Runner should process all event types and stop cleanly"
979 );
980 }
981
982 #[tokio::test]
983 async fn test_runner_handle_stops_runner() {
984 let (_data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
985 let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
986 let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
987 let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
988 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
989 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
990
991 let mut runner = create_test_runner(
992 time_evt_rx,
993 data_evt_rx,
994 data_cmd_rx,
995 exec_evt_rx,
996 exec_cmd_rx,
997 signal_rx,
998 signal_tx.clone(),
999 );
1000
1001 let handle = runner.handle();
1003
1004 let runner_task = tokio::spawn(async move {
1005 runner.run().await;
1006 });
1007
1008 handle.stop();
1010
1011 let result = tokio::time::timeout(Duration::from_millis(100), runner_task).await;
1012 assert!(result.is_ok(), "Runner should stop via handle");
1013 }
1014
1015 #[tokio::test]
1016 async fn test_runner_handle_is_cloneable() {
1017 let (signal_tx, _signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
1018 let handle = AsyncRunnerHandle { signal_tx };
1019
1020 let handle2 = handle.clone();
1021
1022 assert!(handle.signal_tx.send(()).is_ok());
1024 assert!(handle2.signal_tx.send(()).is_ok());
1025 }
1026
1027 #[tokio::test]
1028 async fn test_runner_processes_events_before_stop() {
1029 let (data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
1030 let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
1031 let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
1032 let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
1033 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
1034 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
1035
1036 let mut runner = create_test_runner(
1037 time_evt_rx,
1038 data_evt_rx,
1039 data_cmd_rx,
1040 exec_evt_rx,
1041 exec_cmd_rx,
1042 signal_rx,
1043 signal_tx.clone(),
1044 );
1045
1046 let handle = runner.handle();
1047
1048 for _ in 0..10 {
1050 let quote = test_quote();
1051 data_evt_tx
1052 .send(DataEvent::Data(Data::Quote(quote)))
1053 .unwrap();
1054 }
1055
1056 let runner_task = tokio::spawn(async move {
1057 runner.run().await;
1058 });
1059
1060 tokio::time::sleep(Duration::from_millis(50)).await;
1062
1063 handle.stop();
1065
1066 let result = tokio::time::timeout(Duration::from_millis(200), runner_task).await;
1067 assert!(result.is_ok(), "Runner should process events and stop");
1068 }
1069}