1use std::{fmt::Debug, sync::Arc};
17
18use nautilus_common::{
19 live::runner::{set_data_event_sender, set_exec_event_sender},
20 messages::{
21 DataEvent, ExecutionEvent, ExecutionReport, data::DataCommand, execution::TradingCommand,
22 },
23 msgbus::{self, switchboard::MessagingSwitchboard},
24 runner::{
25 DataCommandSender, TimeEventSender, TradingCommandSender, set_data_cmd_sender,
26 set_exec_cmd_sender, set_time_event_sender,
27 },
28 timer::TimeEventHandlerV2,
29};
30
31#[derive(Debug)]
33pub struct AsyncDataCommandSender {
34 cmd_tx: tokio::sync::mpsc::UnboundedSender<DataCommand>,
35}
36
37impl AsyncDataCommandSender {
38 #[must_use]
39 pub const fn new(cmd_tx: tokio::sync::mpsc::UnboundedSender<DataCommand>) -> Self {
40 Self { cmd_tx }
41 }
42}
43
44impl DataCommandSender for AsyncDataCommandSender {
45 fn execute(&self, command: DataCommand) {
46 if let Err(e) = self.cmd_tx.send(command) {
47 log::error!("Failed to send data command: {e}");
48 }
49 }
50}
51
52#[derive(Debug, Clone)]
54pub struct AsyncTimeEventSender {
55 time_tx: tokio::sync::mpsc::UnboundedSender<TimeEventHandlerV2>,
56}
57
58impl AsyncTimeEventSender {
59 #[must_use]
60 pub const fn new(time_tx: tokio::sync::mpsc::UnboundedSender<TimeEventHandlerV2>) -> Self {
61 Self { time_tx }
62 }
63
64 #[must_use]
69 pub fn get_channel_sender(&self) -> tokio::sync::mpsc::UnboundedSender<TimeEventHandlerV2> {
70 self.time_tx.clone()
71 }
72}
73
74impl TimeEventSender for AsyncTimeEventSender {
75 fn send(&self, handler: TimeEventHandlerV2) {
76 if let Err(e) = self.time_tx.send(handler) {
77 log::error!("Failed to send time event handler: {e}");
78 }
79 }
80}
81
82#[derive(Debug)]
84pub struct AsyncTradingCommandSender {
85 cmd_tx: tokio::sync::mpsc::UnboundedSender<TradingCommand>,
86}
87
88impl AsyncTradingCommandSender {
89 #[must_use]
90 pub const fn new(cmd_tx: tokio::sync::mpsc::UnboundedSender<TradingCommand>) -> Self {
91 Self { cmd_tx }
92 }
93}
94
95impl TradingCommandSender for AsyncTradingCommandSender {
96 fn execute(&self, command: TradingCommand) {
97 if let Err(e) = self.cmd_tx.send(command) {
98 log::error!("Failed to send trading command: {e}");
99 }
100 }
101}
102
103pub trait Runner {
104 fn run(&mut self);
105}
106
107#[derive(Debug)]
112pub struct AsyncRunnerChannels {
113 pub time_evt_rx: tokio::sync::mpsc::UnboundedReceiver<TimeEventHandlerV2>,
114 pub data_evt_rx: tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
115 pub data_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
116 pub exec_evt_rx: tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
117 pub exec_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<TradingCommand>,
118}
119
120pub struct AsyncRunner {
121 channels: AsyncRunnerChannels,
122 signal_rx: tokio::sync::mpsc::UnboundedReceiver<()>,
123 signal_tx: tokio::sync::mpsc::UnboundedSender<()>,
124}
125
126#[derive(Clone, Debug)]
128pub struct AsyncRunnerHandle {
129 signal_tx: tokio::sync::mpsc::UnboundedSender<()>,
130}
131
132impl AsyncRunnerHandle {
133 pub fn stop(&self) {
135 if let Err(e) = self.signal_tx.send(()) {
136 log::error!("Failed to send shutdown signal: {e}");
137 }
138 }
139}
140
141impl Default for AsyncRunner {
142 fn default() -> Self {
143 Self::new()
144 }
145}
146
147impl Debug for AsyncRunner {
148 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149 f.debug_struct(stringify!(AsyncRunner)).finish()
150 }
151}
152
153impl AsyncRunner {
154 #[must_use]
156 pub fn new() -> Self {
157 use tokio::sync::mpsc::unbounded_channel; let (time_evt_tx, time_evt_rx) = unbounded_channel::<TimeEventHandlerV2>();
160 let (data_cmd_tx, data_cmd_rx) = unbounded_channel::<DataCommand>();
161 let (data_evt_tx, data_evt_rx) = unbounded_channel::<DataEvent>();
162 let (exec_cmd_tx, exec_cmd_rx) = unbounded_channel::<TradingCommand>();
163 let (exec_evt_tx, exec_evt_rx) = unbounded_channel::<ExecutionEvent>();
164 let (signal_tx, signal_rx) = unbounded_channel::<()>();
165
166 set_time_event_sender(Arc::new(AsyncTimeEventSender::new(time_evt_tx)));
167 set_data_cmd_sender(Arc::new(AsyncDataCommandSender::new(data_cmd_tx)));
168 set_data_event_sender(data_evt_tx);
169 set_exec_cmd_sender(Arc::new(AsyncTradingCommandSender::new(exec_cmd_tx)));
170 set_exec_event_sender(exec_evt_tx);
171
172 Self {
173 channels: AsyncRunnerChannels {
174 time_evt_rx,
175 data_evt_rx,
176 data_cmd_rx,
177 exec_evt_rx,
178 exec_cmd_rx,
179 },
180 signal_rx,
181 signal_tx,
182 }
183 }
184
185 pub fn stop(&self) {
187 if let Err(e) = self.signal_tx.send(()) {
188 log::error!("Failed to send shutdown signal: {e}");
189 }
190 }
191
192 #[must_use]
194 pub fn handle(&self) -> AsyncRunnerHandle {
195 AsyncRunnerHandle {
196 signal_tx: self.signal_tx.clone(),
197 }
198 }
199
200 #[must_use]
205 pub fn take_channels(self) -> AsyncRunnerChannels {
206 self.channels
207 }
208
209 pub async fn run(&mut self) {
214 log::info!("AsyncRunner starting");
215
216 loop {
217 tokio::select! {
218 Some(()) = self.signal_rx.recv() => {
219 tracing::info!("AsyncRunner received signal, shutting down");
220 return;
221 },
222 Some(handler) = self.channels.time_evt_rx.recv() => {
223 Self::handle_time_event(handler);
224 },
225 Some(cmd) = self.channels.data_cmd_rx.recv() => {
226 Self::handle_data_command(cmd);
227 },
228 Some(evt) = self.channels.data_evt_rx.recv() => {
229 Self::handle_data_event(evt);
230 },
231 Some(cmd) = self.channels.exec_cmd_rx.recv() => {
232 Self::handle_exec_command(cmd);
233 },
234 Some(evt) = self.channels.exec_evt_rx.recv() => {
235 Self::handle_exec_event(evt);
236 },
237 else => {
238 tracing::debug!("AsyncRunner all channels closed, exiting");
239 return;
240 }
241 };
242 }
243 }
244
245 #[inline]
247 pub fn handle_time_event(handler: TimeEventHandlerV2) {
248 handler.run();
249 }
250
251 #[inline]
253 pub fn handle_data_command(cmd: DataCommand) {
254 msgbus::send_any(MessagingSwitchboard::data_engine_execute(), &cmd);
255 }
256
257 #[inline]
259 pub fn handle_data_event(event: DataEvent) {
260 match event {
261 DataEvent::Data(data) => {
262 msgbus::send_any(MessagingSwitchboard::data_engine_process(), &data);
263 }
264 DataEvent::Instrument(data) => {
265 msgbus::send_any(MessagingSwitchboard::data_engine_process(), &data);
266 }
267 DataEvent::Response(resp) => {
268 msgbus::send_any(MessagingSwitchboard::data_engine_response(), &resp);
269 }
270 #[cfg(feature = "defi")]
271 DataEvent::DeFi(data) => {
272 msgbus::send_any(MessagingSwitchboard::data_engine_process(), &data);
273 }
274 }
275 }
276
277 #[inline]
279 pub fn handle_exec_command(cmd: TradingCommand) {
280 msgbus::send_any(MessagingSwitchboard::exec_engine_execute(), &cmd);
281 }
282
283 #[inline]
285 pub fn handle_exec_event(event: ExecutionEvent) {
286 match event {
287 ExecutionEvent::Order(ref order_event) => {
288 msgbus::send_any(MessagingSwitchboard::exec_engine_process(), order_event);
289 }
290 ExecutionEvent::Report(report) => {
291 Self::handle_exec_report(report);
292 }
293 ExecutionEvent::Account(ref account) => {
294 msgbus::send_any(MessagingSwitchboard::portfolio_update_account(), account);
295 }
296 }
297 }
298
299 #[inline]
300 pub fn handle_exec_report(report: ExecutionReport) {
301 match report {
302 ExecutionReport::OrderStatus(r) => {
303 msgbus::send_any(
304 MessagingSwitchboard::exec_engine_reconcile_execution_report(),
305 &*r,
306 );
307 }
308 ExecutionReport::Fill(r) => {
309 msgbus::send_any(
310 MessagingSwitchboard::exec_engine_reconcile_execution_report(),
311 &*r,
312 );
313 }
314 ExecutionReport::Position(r) => {
315 msgbus::send_any(
316 MessagingSwitchboard::exec_engine_reconcile_execution_report(),
317 &*r,
318 );
319 }
320 ExecutionReport::Mass(r) => {
321 msgbus::send_any(
322 MessagingSwitchboard::exec_engine_reconcile_execution_mass_status(),
323 &*r,
324 );
325 }
326 }
327 }
328}
329
330#[cfg(test)]
331mod tests {
332 use std::time::Duration;
333
334 use nautilus_common::{
335 messages::{
336 ExecutionEvent, ExecutionReport,
337 data::{SubscribeCommand, SubscribeCustomData},
338 execution::TradingCommand,
339 },
340 timer::{TimeEvent, TimeEventCallback, TimeEventHandlerV2},
341 };
342 use nautilus_core::{UUID4, UnixNanos};
343 use nautilus_model::{
344 data::{Data, DataType, quote::QuoteTick},
345 enums::{
346 AccountType, LiquiditySide, OrderSide, OrderStatus, OrderType, PositionSideSpecified,
347 TimeInForce,
348 },
349 events::{OrderEvent, OrderEventAny, OrderSubmitted, account::state::AccountState},
350 identifiers::{
351 AccountId, ClientId, ClientOrderId, InstrumentId, PositionId, StrategyId, TradeId,
352 TraderId, VenueOrderId,
353 },
354 reports::{FillReport, OrderStatusReport, PositionStatusReport},
355 types::{Money, Price, Quantity},
356 };
357 use rstest::rstest;
358 use ustr::Ustr;
359
360 use super::*;
361
362 fn test_quote() -> QuoteTick {
364 QuoteTick {
365 instrument_id: InstrumentId::from("EUR/USD.SIM"),
366 bid_price: Price::from("1.10000"),
367 ask_price: Price::from("1.10001"),
368 bid_size: Quantity::from(1_000_000),
369 ask_size: Quantity::from(1_000_000),
370 ts_event: UnixNanos::default(),
371 ts_init: UnixNanos::default(),
372 }
373 }
374
375 fn create_test_runner(
377 time_evt_rx: tokio::sync::mpsc::UnboundedReceiver<TimeEventHandlerV2>,
378 data_evt_rx: tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
379 data_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
380 exec_evt_rx: tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
381 exec_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<TradingCommand>,
382 signal_rx: tokio::sync::mpsc::UnboundedReceiver<()>,
383 signal_tx: tokio::sync::mpsc::UnboundedSender<()>,
384 ) -> AsyncRunner {
385 AsyncRunner {
386 channels: AsyncRunnerChannels {
387 time_evt_rx,
388 data_evt_rx,
389 data_cmd_rx,
390 exec_evt_rx,
391 exec_cmd_rx,
392 },
393 signal_rx,
394 signal_tx,
395 }
396 }
397
398 #[rstest]
399 fn test_async_data_command_sender_creation() {
400 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
401 let sender = AsyncDataCommandSender::new(tx);
402 assert!(format!("{sender:?}").contains("AsyncDataCommandSender"));
403 }
404
405 #[rstest]
406 fn test_async_time_event_sender_creation() {
407 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
408 let sender = AsyncTimeEventSender::new(tx);
409 assert!(format!("{sender:?}").contains("AsyncTimeEventSender"));
410 }
411
412 #[rstest]
413 fn test_async_time_event_sender_get_channel() {
414 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
415 let sender = AsyncTimeEventSender::new(tx);
416 let channel = sender.get_channel_sender();
417
418 let event = TimeEvent::new(
420 Ustr::from("test"),
421 UUID4::new(),
422 UnixNanos::from(1),
423 UnixNanos::from(2),
424 );
425 let callback = TimeEventCallback::from(|_: TimeEvent| {});
426 let handler = TimeEventHandlerV2::new(event, callback);
427
428 assert!(channel.send(handler).is_ok());
429 }
430
431 #[tokio::test]
432 async fn test_async_data_command_sender_execute() {
433 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
434 let sender = AsyncDataCommandSender::new(tx);
435
436 let command = DataCommand::Subscribe(SubscribeCommand::Data(SubscribeCustomData {
437 client_id: Some(ClientId::from("TEST")),
438 venue: None,
439 data_type: DataType::new("QuoteTick", None),
440 command_id: UUID4::new(),
441 ts_init: UnixNanos::default(),
442 params: None,
443 }));
444
445 sender.execute(command.clone());
446
447 let received = rx.recv().await.unwrap();
448 match (received, command) {
449 (
450 DataCommand::Subscribe(SubscribeCommand::Data(r)),
451 DataCommand::Subscribe(SubscribeCommand::Data(c)),
452 ) => {
453 assert_eq!(r.client_id, c.client_id);
454 assert_eq!(r.data_type, c.data_type);
455 }
456 _ => panic!("Command mismatch"),
457 }
458 }
459
460 #[tokio::test]
461 async fn test_async_time_event_sender_send() {
462 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
463 let sender = AsyncTimeEventSender::new(tx);
464
465 let event = TimeEvent::new(
466 Ustr::from("test"),
467 UUID4::new(),
468 UnixNanos::from(1),
469 UnixNanos::from(2),
470 );
471 let callback = TimeEventCallback::from(|_: TimeEvent| {});
472 let handler = TimeEventHandlerV2::new(event, callback);
473
474 sender.send(handler);
475
476 assert!(rx.recv().await.is_some());
477 }
478
479 #[tokio::test]
480 async fn test_runner_shutdown_signal() {
481 let (_data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
483 let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
484 let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
485 let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
486 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
487 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
488
489 let mut runner = create_test_runner(
490 time_evt_rx,
491 data_evt_rx,
492 data_cmd_rx,
493 exec_evt_rx,
494 exec_cmd_rx,
495 signal_rx,
496 signal_tx.clone(),
497 );
498
499 let runner_handle = tokio::spawn(async move {
501 runner.run().await;
502 });
503
504 signal_tx.send(()).unwrap();
506
507 let result = tokio::time::timeout(Duration::from_millis(100), runner_handle).await;
509 assert!(result.is_ok(), "Runner should stop on signal");
510 }
511
512 #[tokio::test]
513 async fn test_runner_closes_on_channel_drop() {
514 let (data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
515 let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
516 let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
517 let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
518 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
519 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
520
521 let mut runner = create_test_runner(
522 time_evt_rx,
523 data_evt_rx,
524 data_cmd_rx,
525 exec_evt_rx,
526 exec_cmd_rx,
527 signal_rx,
528 signal_tx.clone(),
529 );
530
531 let runner_handle = tokio::spawn(async move {
533 runner.run().await;
534 });
535
536 drop(data_tx);
538
539 tokio::time::sleep(Duration::from_millis(50)).await;
541 signal_tx.send(()).ok();
542
543 let result = tokio::time::timeout(Duration::from_millis(200), runner_handle).await;
545 assert!(
546 result.is_ok(),
547 "Runner should stop when channels close or on signal"
548 );
549 }
550
551 #[tokio::test]
552 async fn test_concurrent_event_sending() {
553 let (data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
554 let (_data_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
555 let (_time_evt_tx, time_evt_rx) =
556 tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
557 let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
558 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
559 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
560
561 let mut runner = create_test_runner(
563 time_evt_rx,
564 data_evt_rx,
565 data_cmd_rx,
566 exec_evt_rx,
567 exec_cmd_rx,
568 signal_rx,
569 signal_tx.clone(),
570 );
571
572 let mut handles = vec![];
574 for _ in 0..5 {
575 let tx_clone = data_evt_tx.clone();
576 let handle = tokio::spawn(async move {
577 for _ in 0..20 {
578 let quote = test_quote();
579 tx_clone.send(DataEvent::Data(Data::Quote(quote))).unwrap();
580 tokio::task::yield_now().await;
581 }
582 });
583 handles.push(handle);
584 }
585
586 let runner_handle = tokio::spawn(async move {
588 runner.run().await;
589 });
590
591 for handle in handles {
593 handle.await.unwrap();
594 }
595
596 tokio::time::sleep(Duration::from_millis(50)).await;
598
599 signal_tx.send(()).unwrap();
601
602 let _ = tokio::time::timeout(Duration::from_secs(1), runner_handle).await;
603 }
604
605 #[rstest]
606 #[case(10)]
607 #[case(100)]
608 #[case(1000)]
609 fn test_channel_send_performance(#[case] count: usize) {
610 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
611 let quote = test_quote();
612
613 for _ in 0..count {
615 tx.send(DataEvent::Data(Data::Quote(quote))).unwrap();
616 }
617
618 let mut received = 0;
620 while rx.try_recv().is_ok() {
621 received += 1;
622 }
623
624 assert_eq!(received, count);
625 }
626
627 #[rstest]
628 fn test_async_trading_command_sender_creation() {
629 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
630 let sender = AsyncTradingCommandSender::new(tx);
631 assert!(format!("{sender:?}").contains("AsyncTradingCommandSender"));
632 }
633
634 #[tokio::test]
635 async fn test_execution_event_order_channel() {
636 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
637
638 let event = OrderSubmitted::new(
639 TraderId::from("TRADER-001"),
640 StrategyId::from("S-001"),
641 InstrumentId::from("EUR/USD.SIM"),
642 ClientOrderId::from("O-001"),
643 AccountId::from("SIM-001"),
644 UUID4::new(),
645 UnixNanos::from(1),
646 UnixNanos::from(2),
647 );
648
649 tx.send(ExecutionEvent::Order(OrderEventAny::Submitted(event)))
650 .unwrap();
651
652 let received = rx.recv().await.unwrap();
653 match received {
654 ExecutionEvent::Order(OrderEventAny::Submitted(e)) => {
655 assert_eq!(e.client_order_id(), ClientOrderId::from("O-001"));
656 }
657 _ => panic!("Expected OrderSubmitted event"),
658 }
659 }
660
661 #[tokio::test]
662 async fn test_execution_report_order_status_channel() {
663 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
664
665 let report = OrderStatusReport::new(
666 AccountId::from("SIM-001"),
667 InstrumentId::from("EUR/USD.SIM"),
668 Some(ClientOrderId::from("O-001")),
669 VenueOrderId::from("V-001"),
670 OrderSide::Buy,
671 OrderType::Market,
672 TimeInForce::Gtc,
673 OrderStatus::Accepted,
674 Quantity::from(100_000),
675 Quantity::from(100_000),
676 UnixNanos::from(1),
677 UnixNanos::from(2),
678 UnixNanos::from(3),
679 None,
680 );
681
682 tx.send(ExecutionEvent::Report(ExecutionReport::OrderStatus(
683 Box::new(report),
684 )))
685 .unwrap();
686
687 let received = rx.recv().await.unwrap();
688 match received {
689 ExecutionEvent::Report(ExecutionReport::OrderStatus(r)) => {
690 assert_eq!(r.venue_order_id.as_str(), "V-001");
691 assert_eq!(r.order_status, OrderStatus::Accepted);
692 }
693 _ => panic!("Expected OrderStatusReport"),
694 }
695 }
696
697 #[tokio::test]
698 async fn test_execution_report_fill() {
699 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
700
701 let report = FillReport::new(
702 AccountId::from("SIM-001"),
703 InstrumentId::from("EUR/USD.SIM"),
704 VenueOrderId::from("V-001"),
705 TradeId::from("T-001"),
706 OrderSide::Buy,
707 Quantity::from(100_000),
708 Price::from("1.10000"),
709 Money::from("10 USD"),
710 LiquiditySide::Taker,
711 Some(ClientOrderId::from("O-001")),
712 None,
713 UnixNanos::from(1),
714 UnixNanos::from(2),
715 None,
716 );
717
718 tx.send(ExecutionEvent::Report(ExecutionReport::Fill(Box::new(
719 report,
720 ))))
721 .unwrap();
722
723 let received = rx.recv().await.unwrap();
724 match received {
725 ExecutionEvent::Report(ExecutionReport::Fill(r)) => {
726 assert_eq!(r.venue_order_id.as_str(), "V-001");
727 assert_eq!(r.trade_id.to_string(), "T-001");
728 }
729 _ => panic!("Expected FillReport"),
730 }
731 }
732
733 #[tokio::test]
734 async fn test_execution_report_position() {
735 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
736
737 let report = PositionStatusReport::new(
738 AccountId::from("SIM-001"),
739 InstrumentId::from("EUR/USD.SIM"),
740 PositionSideSpecified::Long,
741 Quantity::from(100_000),
742 UnixNanos::from(1),
743 UnixNanos::from(2),
744 None,
745 Some(PositionId::from("P-001")),
746 None,
747 );
748
749 tx.send(ExecutionEvent::Report(ExecutionReport::Position(Box::new(
750 report,
751 ))))
752 .unwrap();
753
754 let received = rx.recv().await.unwrap();
755 match received {
756 ExecutionEvent::Report(ExecutionReport::Position(r)) => {
757 assert_eq!(r.venue_position_id.unwrap().as_str(), "P-001");
758 }
759 _ => panic!("Expected PositionStatusReport"),
760 }
761 }
762
763 #[tokio::test]
764 async fn test_execution_event_account() {
765 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
766
767 let account_state = AccountState::new(
768 AccountId::from("SIM-001"),
769 AccountType::Cash,
770 vec![],
771 vec![],
772 true,
773 UUID4::new(),
774 UnixNanos::from(1),
775 UnixNanos::from(2),
776 None,
777 );
778
779 tx.send(ExecutionEvent::Account(account_state)).unwrap();
780
781 let received = rx.recv().await.unwrap();
782 match received {
783 ExecutionEvent::Account(r) => {
784 assert_eq!(r.account_id.as_str(), "SIM-001");
785 }
786 _ => panic!("Expected AccountState"),
787 }
788 }
789
790 #[tokio::test]
791 async fn test_runner_stop_method() {
792 let (_data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
793 let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
794 let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
795 let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
796 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
797 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
798
799 let mut runner = create_test_runner(
800 time_evt_rx,
801 data_evt_rx,
802 data_cmd_rx,
803 exec_evt_rx,
804 exec_cmd_rx,
805 signal_rx,
806 signal_tx.clone(),
807 );
808
809 let runner_handle = tokio::spawn(async move {
810 runner.run().await;
811 });
812
813 signal_tx.send(()).unwrap();
815
816 let result = tokio::time::timeout(Duration::from_millis(100), runner_handle).await;
817 assert!(result.is_ok(), "Runner should stop when stop() is called");
818 }
819
820 #[tokio::test]
821 async fn test_all_event_types_integration() {
822 let (data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
823 let (data_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
824 let (time_evt_tx, time_evt_rx) =
825 tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
826 let (exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
827 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
828 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
829
830 let mut runner = create_test_runner(
831 time_evt_rx,
832 data_evt_rx,
833 data_cmd_rx,
834 exec_evt_rx,
835 exec_cmd_rx,
836 signal_rx,
837 signal_tx.clone(),
838 );
839
840 let runner_handle = tokio::spawn(async move {
841 runner.run().await;
842 });
843
844 let quote = test_quote();
846 data_evt_tx
847 .send(DataEvent::Data(Data::Quote(quote)))
848 .unwrap();
849
850 let command = DataCommand::Subscribe(SubscribeCommand::Data(SubscribeCustomData {
852 client_id: Some(ClientId::from("TEST")),
853 venue: None,
854 data_type: DataType::new("QuoteTick", None),
855 command_id: UUID4::new(),
856 ts_init: UnixNanos::default(),
857 params: None,
858 }));
859 data_cmd_tx.send(command).unwrap();
860
861 let event = TimeEvent::new(
863 Ustr::from("test"),
864 UUID4::new(),
865 UnixNanos::from(1),
866 UnixNanos::from(2),
867 );
868 let callback = TimeEventCallback::from(|_: TimeEvent| {});
869 let handler = TimeEventHandlerV2::new(event, callback);
870 time_evt_tx.send(handler).unwrap();
871
872 let order_event = OrderSubmitted::new(
874 TraderId::from("TRADER-001"),
875 StrategyId::from("S-001"),
876 InstrumentId::from("EUR/USD.SIM"),
877 ClientOrderId::from("O-001"),
878 AccountId::from("SIM-001"),
879 UUID4::new(),
880 UnixNanos::from(1),
881 UnixNanos::from(2),
882 );
883 exec_evt_tx
884 .send(ExecutionEvent::Order(OrderEventAny::Submitted(order_event)))
885 .unwrap();
886
887 let order_status = OrderStatusReport::new(
889 AccountId::from("SIM-001"),
890 InstrumentId::from("EUR/USD.SIM"),
891 Some(ClientOrderId::from("O-001")),
892 VenueOrderId::from("V-001"),
893 OrderSide::Buy,
894 OrderType::Market,
895 TimeInForce::Gtc,
896 OrderStatus::Accepted,
897 Quantity::from(100_000),
898 Quantity::from(100_000),
899 UnixNanos::from(1),
900 UnixNanos::from(2),
901 UnixNanos::from(3),
902 None,
903 );
904 exec_evt_tx
905 .send(ExecutionEvent::Report(ExecutionReport::OrderStatus(
906 Box::new(order_status),
907 )))
908 .unwrap();
909
910 let fill = FillReport::new(
912 AccountId::from("SIM-001"),
913 InstrumentId::from("EUR/USD.SIM"),
914 VenueOrderId::from("V-001"),
915 TradeId::from("T-001"),
916 OrderSide::Buy,
917 Quantity::from(100_000),
918 Price::from("1.10000"),
919 Money::from("10 USD"),
920 LiquiditySide::Taker,
921 Some(ClientOrderId::from("O-001")),
922 None,
923 UnixNanos::from(1),
924 UnixNanos::from(2),
925 None,
926 );
927 exec_evt_tx
928 .send(ExecutionEvent::Report(ExecutionReport::Fill(Box::new(
929 fill,
930 ))))
931 .unwrap();
932
933 let position = PositionStatusReport::new(
935 AccountId::from("SIM-001"),
936 InstrumentId::from("EUR/USD.SIM"),
937 PositionSideSpecified::Long,
938 Quantity::from(100_000),
939 UnixNanos::from(1),
940 UnixNanos::from(2),
941 None,
942 Some(PositionId::from("P-001")),
943 None,
944 );
945 exec_evt_tx
946 .send(ExecutionEvent::Report(ExecutionReport::Position(Box::new(
947 position,
948 ))))
949 .unwrap();
950
951 let account_state = AccountState::new(
953 AccountId::from("SIM-001"),
954 AccountType::Cash,
955 vec![],
956 vec![],
957 true,
958 UUID4::new(),
959 UnixNanos::from(1),
960 UnixNanos::from(2),
961 None,
962 );
963 exec_evt_tx
964 .send(ExecutionEvent::Account(account_state))
965 .unwrap();
966
967 tokio::time::sleep(Duration::from_millis(100)).await;
969
970 signal_tx.send(()).unwrap();
972
973 let result = tokio::time::timeout(Duration::from_secs(1), runner_handle).await;
974 assert!(
975 result.is_ok(),
976 "Runner should process all event types and stop cleanly"
977 );
978 }
979
980 #[tokio::test]
981 async fn test_runner_handle_stops_runner() {
982 let (_data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
983 let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
984 let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
985 let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
986 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
987 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
988
989 let mut runner = create_test_runner(
990 time_evt_rx,
991 data_evt_rx,
992 data_cmd_rx,
993 exec_evt_rx,
994 exec_cmd_rx,
995 signal_rx,
996 signal_tx.clone(),
997 );
998
999 let handle = runner.handle();
1001
1002 let runner_task = tokio::spawn(async move {
1003 runner.run().await;
1004 });
1005
1006 handle.stop();
1008
1009 let result = tokio::time::timeout(Duration::from_millis(100), runner_task).await;
1010 assert!(result.is_ok(), "Runner should stop via handle");
1011 }
1012
1013 #[tokio::test]
1014 async fn test_runner_handle_is_cloneable() {
1015 let (signal_tx, _signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
1016 let handle = AsyncRunnerHandle { signal_tx };
1017
1018 let handle2 = handle.clone();
1019
1020 assert!(handle.signal_tx.send(()).is_ok());
1022 assert!(handle2.signal_tx.send(()).is_ok());
1023 }
1024
1025 #[tokio::test]
1026 async fn test_runner_processes_events_before_stop() {
1027 let (data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
1028 let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
1029 let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
1030 let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
1031 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
1032 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
1033
1034 let mut runner = create_test_runner(
1035 time_evt_rx,
1036 data_evt_rx,
1037 data_cmd_rx,
1038 exec_evt_rx,
1039 exec_cmd_rx,
1040 signal_rx,
1041 signal_tx.clone(),
1042 );
1043
1044 let handle = runner.handle();
1045
1046 for _ in 0..10 {
1048 let quote = test_quote();
1049 data_evt_tx
1050 .send(DataEvent::Data(Data::Quote(quote)))
1051 .unwrap();
1052 }
1053
1054 let runner_task = tokio::spawn(async move {
1055 runner.run().await;
1056 });
1057
1058 tokio::time::sleep(Duration::from_millis(50)).await;
1060
1061 handle.stop();
1063
1064 let result = tokio::time::timeout(Duration::from_millis(200), runner_task).await;
1065 assert!(result.is_ok(), "Runner should process events and stop");
1066 }
1067}