1use std::{any::Any, fmt::Debug, sync::Arc};
17
18use nautilus_common::{
19 live::runner::{set_data_event_sender, set_exec_event_sender},
20 messages::{
21 DataEvent, ExecutionEvent, ExecutionReport, data::DataCommand, execution::TradingCommand,
22 },
23 msgbus::{self, switchboard::MessagingSwitchboard},
24 runner::{
25 DataCommandSender, TimeEventSender, TradingCommandSender, set_data_cmd_sender,
26 set_exec_cmd_sender, set_time_event_sender,
27 },
28 timer::TimeEventHandler,
29};
30
31#[derive(Debug)]
33pub struct AsyncDataCommandSender {
34 cmd_tx: tokio::sync::mpsc::UnboundedSender<DataCommand>,
35}
36
37impl AsyncDataCommandSender {
38 #[must_use]
39 pub const fn new(cmd_tx: tokio::sync::mpsc::UnboundedSender<DataCommand>) -> Self {
40 Self { cmd_tx }
41 }
42}
43
44impl DataCommandSender for AsyncDataCommandSender {
45 fn execute(&self, command: DataCommand) {
46 if let Err(e) = self.cmd_tx.send(command) {
47 log::error!("Failed to send data command: {e}");
48 }
49 }
50}
51
52#[derive(Debug, Clone)]
54pub struct AsyncTimeEventSender {
55 time_tx: tokio::sync::mpsc::UnboundedSender<TimeEventHandler>,
56}
57
58impl AsyncTimeEventSender {
59 #[must_use]
60 pub const fn new(time_tx: tokio::sync::mpsc::UnboundedSender<TimeEventHandler>) -> Self {
61 Self { time_tx }
62 }
63
64 #[must_use]
69 pub fn get_channel_sender(&self) -> tokio::sync::mpsc::UnboundedSender<TimeEventHandler> {
70 self.time_tx.clone()
71 }
72}
73
74impl TimeEventSender for AsyncTimeEventSender {
75 fn send(&self, handler: TimeEventHandler) {
76 if let Err(e) = self.time_tx.send(handler) {
77 log::error!("Failed to send time event handler: {e}");
78 }
79 }
80}
81
82#[derive(Debug)]
84pub struct AsyncTradingCommandSender {
85 cmd_tx: tokio::sync::mpsc::UnboundedSender<TradingCommand>,
86}
87
88impl AsyncTradingCommandSender {
89 #[must_use]
90 pub const fn new(cmd_tx: tokio::sync::mpsc::UnboundedSender<TradingCommand>) -> Self {
91 Self { cmd_tx }
92 }
93}
94
95impl TradingCommandSender for AsyncTradingCommandSender {
96 fn execute(&self, command: TradingCommand) {
97 if let Err(e) = self.cmd_tx.send(command) {
98 log::error!("Failed to send trading command: {e}");
99 }
100 }
101}
102
103pub trait Runner {
104 fn run(&mut self);
105}
106
107#[derive(Debug)]
112pub struct AsyncRunnerChannels {
113 pub time_evt_rx: tokio::sync::mpsc::UnboundedReceiver<TimeEventHandler>,
114 pub data_evt_rx: tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
115 pub data_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
116 pub exec_evt_rx: tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
117 pub exec_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<TradingCommand>,
118}
119
120pub struct AsyncRunner {
121 channels: AsyncRunnerChannels,
122 signal_rx: tokio::sync::mpsc::UnboundedReceiver<()>,
123 signal_tx: tokio::sync::mpsc::UnboundedSender<()>,
124}
125
126#[derive(Clone, Debug)]
128pub struct AsyncRunnerHandle {
129 signal_tx: tokio::sync::mpsc::UnboundedSender<()>,
130}
131
132impl AsyncRunnerHandle {
133 pub fn stop(&self) {
135 if let Err(e) = self.signal_tx.send(()) {
136 log::error!("Failed to send shutdown signal: {e}");
137 }
138 }
139}
140
141impl Default for AsyncRunner {
142 fn default() -> Self {
143 Self::new()
144 }
145}
146
147impl Debug for AsyncRunner {
148 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149 f.debug_struct(stringify!(AsyncRunner)).finish()
150 }
151}
152
153impl AsyncRunner {
154 #[must_use]
156 pub fn new() -> Self {
157 use tokio::sync::mpsc::unbounded_channel; let (time_evt_tx, time_evt_rx) = unbounded_channel::<TimeEventHandler>();
160 let (data_cmd_tx, data_cmd_rx) = unbounded_channel::<DataCommand>();
161 let (data_evt_tx, data_evt_rx) = unbounded_channel::<DataEvent>();
162 let (exec_cmd_tx, exec_cmd_rx) = unbounded_channel::<TradingCommand>();
163 let (exec_evt_tx, exec_evt_rx) = unbounded_channel::<ExecutionEvent>();
164 let (signal_tx, signal_rx) = unbounded_channel::<()>();
165
166 set_time_event_sender(Arc::new(AsyncTimeEventSender::new(time_evt_tx)));
167 set_data_cmd_sender(Arc::new(AsyncDataCommandSender::new(data_cmd_tx)));
168 set_data_event_sender(data_evt_tx);
169 set_exec_cmd_sender(Arc::new(AsyncTradingCommandSender::new(exec_cmd_tx)));
170 set_exec_event_sender(exec_evt_tx);
171
172 Self {
173 channels: AsyncRunnerChannels {
174 time_evt_rx,
175 data_evt_rx,
176 data_cmd_rx,
177 exec_evt_rx,
178 exec_cmd_rx,
179 },
180 signal_rx,
181 signal_tx,
182 }
183 }
184
185 pub fn stop(&self) {
187 if let Err(e) = self.signal_tx.send(()) {
188 log::error!("Failed to send shutdown signal: {e}");
189 }
190 }
191
192 #[must_use]
194 pub fn handle(&self) -> AsyncRunnerHandle {
195 AsyncRunnerHandle {
196 signal_tx: self.signal_tx.clone(),
197 }
198 }
199
200 #[must_use]
205 pub fn take_channels(self) -> AsyncRunnerChannels {
206 self.channels
207 }
208
209 pub fn drain_pending_data_events(&mut self) {
211 let mut count = 0;
212 while let Ok(evt) = self.channels.data_evt_rx.try_recv() {
213 Self::handle_data_event(evt);
214 count += 1;
215 }
216 if count > 0 {
217 log::debug!("Drained {count} pending data events");
218 }
219 }
220
221 pub async fn run(&mut self) {
226 log::info!("AsyncRunner starting");
227
228 loop {
229 tokio::select! {
230 Some(()) = self.signal_rx.recv() => {
231 log::info!("AsyncRunner received signal, shutting down");
232 return;
233 },
234 Some(handler) = self.channels.time_evt_rx.recv() => {
235 Self::handle_time_event(handler);
236 },
237 Some(cmd) = self.channels.data_cmd_rx.recv() => {
238 Self::handle_data_command(cmd);
239 },
240 Some(evt) = self.channels.data_evt_rx.recv() => {
241 Self::handle_data_event(evt);
242 },
243 Some(cmd) = self.channels.exec_cmd_rx.recv() => {
244 Self::handle_exec_command(cmd);
245 },
246 Some(evt) = self.channels.exec_evt_rx.recv() => {
247 Self::handle_exec_event(evt);
248 },
249 else => {
250 log::debug!("AsyncRunner all channels closed, exiting");
251 return;
252 }
253 };
254 }
255 }
256
257 #[inline]
259 pub fn handle_time_event(handler: TimeEventHandler) {
260 handler.run();
261 }
262
263 #[inline]
265 pub fn handle_data_command(cmd: DataCommand) {
266 msgbus::send_any(MessagingSwitchboard::data_engine_execute(), &cmd);
267 }
268
269 #[inline]
271 pub fn handle_data_event(event: DataEvent) {
272 match event {
273 DataEvent::Data(data) => {
274 msgbus::send_any(MessagingSwitchboard::data_engine_process(), &data);
275 }
276 DataEvent::Instrument(data) => {
277 msgbus::send_any(MessagingSwitchboard::data_engine_process(), &data);
278 }
279 DataEvent::Response(resp) => {
280 msgbus::send_any(MessagingSwitchboard::data_engine_response(), &resp);
281 }
282 DataEvent::FundingRate(funding_rate) => {
283 msgbus::send_any(MessagingSwitchboard::data_engine_process(), &funding_rate);
284 }
285 #[cfg(feature = "defi")]
286 DataEvent::DeFi(data) => {
287 msgbus::send_any(MessagingSwitchboard::data_engine_process(), &data);
288 }
289 }
290 }
291
292 #[inline]
294 pub fn handle_exec_command(cmd: TradingCommand) {
295 msgbus::send_any(MessagingSwitchboard::exec_engine_execute(), &cmd);
296 }
297
298 #[inline]
300 pub fn handle_exec_event(event: ExecutionEvent) {
301 match event {
302 ExecutionEvent::Order(ref order_event) => {
303 msgbus::send_any(MessagingSwitchboard::exec_engine_process(), order_event);
304 }
305 ExecutionEvent::Report(report) => {
306 Self::handle_exec_report(report);
307 }
308 ExecutionEvent::Account(ref account) => {
309 msgbus::send_any(MessagingSwitchboard::portfolio_update_account(), account);
310 }
311 }
312 }
313
314 #[inline]
315 pub fn handle_exec_report(report: ExecutionReport) {
316 let endpoint = MessagingSwitchboard::exec_engine_reconcile_execution_report();
317 msgbus::send_any(endpoint, &report as &dyn Any);
318 }
319}
320
321#[cfg(test)]
322mod tests {
323 use std::time::Duration;
324
325 use nautilus_common::{
326 messages::{
327 ExecutionEvent, ExecutionReport,
328 data::{SubscribeCommand, SubscribeCustomData},
329 execution::TradingCommand,
330 },
331 timer::{TimeEvent, TimeEventCallback, TimeEventHandler},
332 };
333 use nautilus_core::{UUID4, UnixNanos};
334 use nautilus_model::{
335 data::{Data, DataType, quote::QuoteTick},
336 enums::{
337 AccountType, LiquiditySide, OrderSide, OrderStatus, OrderType, PositionSideSpecified,
338 TimeInForce,
339 },
340 events::{OrderEvent, OrderEventAny, OrderSubmitted, account::state::AccountState},
341 identifiers::{
342 AccountId, ClientId, ClientOrderId, InstrumentId, PositionId, StrategyId, TradeId,
343 TraderId, VenueOrderId,
344 },
345 reports::{FillReport, OrderStatusReport, PositionStatusReport},
346 types::{Money, Price, Quantity},
347 };
348 use rstest::rstest;
349 use ustr::Ustr;
350
351 use super::*;
352
353 fn test_quote() -> QuoteTick {
355 QuoteTick {
356 instrument_id: InstrumentId::from("EUR/USD.SIM"),
357 bid_price: Price::from("1.10000"),
358 ask_price: Price::from("1.10001"),
359 bid_size: Quantity::from(1_000_000),
360 ask_size: Quantity::from(1_000_000),
361 ts_event: UnixNanos::default(),
362 ts_init: UnixNanos::default(),
363 }
364 }
365
366 fn create_test_runner(
368 time_evt_rx: tokio::sync::mpsc::UnboundedReceiver<TimeEventHandler>,
369 data_evt_rx: tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
370 data_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
371 exec_evt_rx: tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
372 exec_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<TradingCommand>,
373 signal_rx: tokio::sync::mpsc::UnboundedReceiver<()>,
374 signal_tx: tokio::sync::mpsc::UnboundedSender<()>,
375 ) -> AsyncRunner {
376 AsyncRunner {
377 channels: AsyncRunnerChannels {
378 time_evt_rx,
379 data_evt_rx,
380 data_cmd_rx,
381 exec_evt_rx,
382 exec_cmd_rx,
383 },
384 signal_rx,
385 signal_tx,
386 }
387 }
388
389 #[rstest]
390 fn test_async_data_command_sender_creation() {
391 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
392 let sender = AsyncDataCommandSender::new(tx);
393 assert!(format!("{sender:?}").contains("AsyncDataCommandSender"));
394 }
395
396 #[rstest]
397 fn test_async_time_event_sender_creation() {
398 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
399 let sender = AsyncTimeEventSender::new(tx);
400 assert!(format!("{sender:?}").contains("AsyncTimeEventSender"));
401 }
402
403 #[rstest]
404 fn test_async_time_event_sender_get_channel() {
405 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
406 let sender = AsyncTimeEventSender::new(tx);
407 let channel = sender.get_channel_sender();
408
409 let event = TimeEvent::new(
411 Ustr::from("test"),
412 UUID4::new(),
413 UnixNanos::from(1),
414 UnixNanos::from(2),
415 );
416 let callback = TimeEventCallback::from(|_: TimeEvent| {});
417 let handler = TimeEventHandler::new(event, callback);
418
419 assert!(channel.send(handler).is_ok());
420 }
421
422 #[tokio::test]
423 async fn test_async_data_command_sender_execute() {
424 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
425 let sender = AsyncDataCommandSender::new(tx);
426
427 let command = DataCommand::Subscribe(SubscribeCommand::Data(SubscribeCustomData {
428 client_id: Some(ClientId::from("TEST")),
429 venue: None,
430 data_type: DataType::new("QuoteTick", None),
431 command_id: UUID4::new(),
432 ts_init: UnixNanos::default(),
433 correlation_id: None,
434 params: None,
435 }));
436
437 sender.execute(command.clone());
438
439 let received = rx.recv().await.unwrap();
440 match (received, command) {
441 (
442 DataCommand::Subscribe(SubscribeCommand::Data(r)),
443 DataCommand::Subscribe(SubscribeCommand::Data(c)),
444 ) => {
445 assert_eq!(r.client_id, c.client_id);
446 assert_eq!(r.data_type, c.data_type);
447 }
448 _ => panic!("Command mismatch"),
449 }
450 }
451
452 #[tokio::test]
453 async fn test_async_time_event_sender_send() {
454 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
455 let sender = AsyncTimeEventSender::new(tx);
456
457 let event = TimeEvent::new(
458 Ustr::from("test"),
459 UUID4::new(),
460 UnixNanos::from(1),
461 UnixNanos::from(2),
462 );
463 let callback = TimeEventCallback::from(|_: TimeEvent| {});
464 let handler = TimeEventHandler::new(event, callback);
465
466 sender.send(handler);
467
468 assert!(rx.recv().await.is_some());
469 }
470
471 #[tokio::test]
472 async fn test_runner_shutdown_signal() {
473 let (_data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
475 let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
476 let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
477 let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
478 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
479 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
480
481 let mut runner = create_test_runner(
482 time_evt_rx,
483 data_evt_rx,
484 data_cmd_rx,
485 exec_evt_rx,
486 exec_cmd_rx,
487 signal_rx,
488 signal_tx.clone(),
489 );
490
491 let runner_handle = tokio::spawn(async move {
493 runner.run().await;
494 });
495
496 signal_tx.send(()).unwrap();
498
499 let result = tokio::time::timeout(Duration::from_millis(100), runner_handle).await;
501 assert!(result.is_ok(), "Runner should stop on signal");
502 }
503
504 #[tokio::test]
505 async fn test_runner_closes_on_channel_drop() {
506 let (data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
507 let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
508 let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
509 let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
510 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
511 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
512
513 let mut runner = create_test_runner(
514 time_evt_rx,
515 data_evt_rx,
516 data_cmd_rx,
517 exec_evt_rx,
518 exec_cmd_rx,
519 signal_rx,
520 signal_tx.clone(),
521 );
522
523 let runner_handle = tokio::spawn(async move {
525 runner.run().await;
526 });
527
528 drop(data_tx);
530
531 tokio::time::sleep(Duration::from_millis(50)).await;
533 signal_tx.send(()).ok();
534
535 let result = tokio::time::timeout(Duration::from_millis(200), runner_handle).await;
537 assert!(
538 result.is_ok(),
539 "Runner should stop when channels close or on signal"
540 );
541 }
542
543 #[tokio::test]
544 async fn test_concurrent_event_sending() {
545 let (data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
546 let (_data_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
547 let (_time_evt_tx, time_evt_rx) =
548 tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
549 let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
550 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
551 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
552
553 let mut runner = create_test_runner(
555 time_evt_rx,
556 data_evt_rx,
557 data_cmd_rx,
558 exec_evt_rx,
559 exec_cmd_rx,
560 signal_rx,
561 signal_tx.clone(),
562 );
563
564 let mut handles = vec![];
566 for _ in 0..5 {
567 let tx_clone = data_evt_tx.clone();
568 let handle = tokio::spawn(async move {
569 for _ in 0..20 {
570 let quote = test_quote();
571 tx_clone.send(DataEvent::Data(Data::Quote(quote))).unwrap();
572 tokio::task::yield_now().await;
573 }
574 });
575 handles.push(handle);
576 }
577
578 let runner_handle = tokio::spawn(async move {
580 runner.run().await;
581 });
582
583 for handle in handles {
585 handle.await.unwrap();
586 }
587
588 tokio::time::sleep(Duration::from_millis(50)).await;
590
591 signal_tx.send(()).unwrap();
593
594 let _ = tokio::time::timeout(Duration::from_secs(1), runner_handle).await;
595 }
596
597 #[rstest]
598 #[case(10)]
599 #[case(100)]
600 #[case(1000)]
601 fn test_channel_send_performance(#[case] count: usize) {
602 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
603 let quote = test_quote();
604
605 for _ in 0..count {
607 tx.send(DataEvent::Data(Data::Quote(quote))).unwrap();
608 }
609
610 let mut received = 0;
612 while rx.try_recv().is_ok() {
613 received += 1;
614 }
615
616 assert_eq!(received, count);
617 }
618
619 #[rstest]
620 fn test_async_trading_command_sender_creation() {
621 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
622 let sender = AsyncTradingCommandSender::new(tx);
623 assert!(format!("{sender:?}").contains("AsyncTradingCommandSender"));
624 }
625
626 #[tokio::test]
627 async fn test_execution_event_order_channel() {
628 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
629
630 let event = OrderSubmitted::new(
631 TraderId::from("TRADER-001"),
632 StrategyId::from("S-001"),
633 InstrumentId::from("EUR/USD.SIM"),
634 ClientOrderId::from("O-001"),
635 AccountId::from("SIM-001"),
636 UUID4::new(),
637 UnixNanos::from(1),
638 UnixNanos::from(2),
639 );
640
641 tx.send(ExecutionEvent::Order(OrderEventAny::Submitted(event)))
642 .unwrap();
643
644 let received = rx.recv().await.unwrap();
645 match received {
646 ExecutionEvent::Order(OrderEventAny::Submitted(e)) => {
647 assert_eq!(e.client_order_id(), ClientOrderId::from("O-001"));
648 }
649 _ => panic!("Expected OrderSubmitted event"),
650 }
651 }
652
653 #[tokio::test]
654 async fn test_execution_report_order_status_channel() {
655 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
656
657 let report = OrderStatusReport::new(
658 AccountId::from("SIM-001"),
659 InstrumentId::from("EUR/USD.SIM"),
660 Some(ClientOrderId::from("O-001")),
661 VenueOrderId::from("V-001"),
662 OrderSide::Buy,
663 OrderType::Market,
664 TimeInForce::Gtc,
665 OrderStatus::Accepted,
666 Quantity::from(100_000),
667 Quantity::from(100_000),
668 UnixNanos::from(1),
669 UnixNanos::from(2),
670 UnixNanos::from(3),
671 None,
672 );
673
674 tx.send(ExecutionEvent::Report(ExecutionReport::Order(Box::new(
675 report,
676 ))))
677 .unwrap();
678
679 let received = rx.recv().await.unwrap();
680 match received {
681 ExecutionEvent::Report(ExecutionReport::Order(r)) => {
682 assert_eq!(r.venue_order_id.as_str(), "V-001");
683 assert_eq!(r.order_status, OrderStatus::Accepted);
684 }
685 _ => panic!("Expected OrderStatusReport"),
686 }
687 }
688
689 #[tokio::test]
690 async fn test_execution_report_fill() {
691 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
692
693 let report = FillReport::new(
694 AccountId::from("SIM-001"),
695 InstrumentId::from("EUR/USD.SIM"),
696 VenueOrderId::from("V-001"),
697 TradeId::from("T-001"),
698 OrderSide::Buy,
699 Quantity::from(100_000),
700 Price::from("1.10000"),
701 Money::from("10 USD"),
702 LiquiditySide::Taker,
703 Some(ClientOrderId::from("O-001")),
704 None,
705 UnixNanos::from(1),
706 UnixNanos::from(2),
707 None,
708 );
709
710 tx.send(ExecutionEvent::Report(ExecutionReport::Fill(Box::new(
711 report,
712 ))))
713 .unwrap();
714
715 let received = rx.recv().await.unwrap();
716 match received {
717 ExecutionEvent::Report(ExecutionReport::Fill(r)) => {
718 assert_eq!(r.venue_order_id.as_str(), "V-001");
719 assert_eq!(r.trade_id.to_string(), "T-001");
720 }
721 _ => panic!("Expected FillReport"),
722 }
723 }
724
725 #[tokio::test]
726 async fn test_execution_report_position() {
727 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
728
729 let report = PositionStatusReport::new(
730 AccountId::from("SIM-001"),
731 InstrumentId::from("EUR/USD.SIM"),
732 PositionSideSpecified::Long,
733 Quantity::from(100_000),
734 UnixNanos::from(1),
735 UnixNanos::from(2),
736 None,
737 Some(PositionId::from("P-001")),
738 None,
739 );
740
741 tx.send(ExecutionEvent::Report(ExecutionReport::Position(Box::new(
742 report,
743 ))))
744 .unwrap();
745
746 let received = rx.recv().await.unwrap();
747 match received {
748 ExecutionEvent::Report(ExecutionReport::Position(r)) => {
749 assert_eq!(r.venue_position_id.unwrap().as_str(), "P-001");
750 }
751 _ => panic!("Expected PositionStatusReport"),
752 }
753 }
754
755 #[tokio::test]
756 async fn test_execution_event_account() {
757 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
758
759 let account_state = AccountState::new(
760 AccountId::from("SIM-001"),
761 AccountType::Cash,
762 vec![],
763 vec![],
764 true,
765 UUID4::new(),
766 UnixNanos::from(1),
767 UnixNanos::from(2),
768 None,
769 );
770
771 tx.send(ExecutionEvent::Account(account_state)).unwrap();
772
773 let received = rx.recv().await.unwrap();
774 match received {
775 ExecutionEvent::Account(r) => {
776 assert_eq!(r.account_id.as_str(), "SIM-001");
777 }
778 _ => panic!("Expected AccountState"),
779 }
780 }
781
782 #[tokio::test]
783 async fn test_runner_stop_method() {
784 let (_data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
785 let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
786 let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
787 let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
788 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
789 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
790
791 let mut runner = create_test_runner(
792 time_evt_rx,
793 data_evt_rx,
794 data_cmd_rx,
795 exec_evt_rx,
796 exec_cmd_rx,
797 signal_rx,
798 signal_tx.clone(),
799 );
800
801 let runner_handle = tokio::spawn(async move {
802 runner.run().await;
803 });
804
805 signal_tx.send(()).unwrap();
807
808 let result = tokio::time::timeout(Duration::from_millis(100), runner_handle).await;
809 assert!(result.is_ok(), "Runner should stop when stop() is called");
810 }
811
812 #[tokio::test]
813 async fn test_all_event_types_integration() {
814 let (data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
815 let (data_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
816 let (time_evt_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
817 let (exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
818 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
819 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
820
821 let mut runner = create_test_runner(
822 time_evt_rx,
823 data_evt_rx,
824 data_cmd_rx,
825 exec_evt_rx,
826 exec_cmd_rx,
827 signal_rx,
828 signal_tx.clone(),
829 );
830
831 let runner_handle = tokio::spawn(async move {
832 runner.run().await;
833 });
834
835 let quote = test_quote();
837 data_evt_tx
838 .send(DataEvent::Data(Data::Quote(quote)))
839 .unwrap();
840
841 let command = DataCommand::Subscribe(SubscribeCommand::Data(SubscribeCustomData {
843 client_id: Some(ClientId::from("TEST")),
844 venue: None,
845 data_type: DataType::new("QuoteTick", None),
846 command_id: UUID4::new(),
847 ts_init: UnixNanos::default(),
848 correlation_id: None,
849 params: None,
850 }));
851 data_cmd_tx.send(command).unwrap();
852
853 let event = TimeEvent::new(
855 Ustr::from("test"),
856 UUID4::new(),
857 UnixNanos::from(1),
858 UnixNanos::from(2),
859 );
860 let callback = TimeEventCallback::from(|_: TimeEvent| {});
861 let handler = TimeEventHandler::new(event, callback);
862 time_evt_tx.send(handler).unwrap();
863
864 let order_event = OrderSubmitted::new(
866 TraderId::from("TRADER-001"),
867 StrategyId::from("S-001"),
868 InstrumentId::from("EUR/USD.SIM"),
869 ClientOrderId::from("O-001"),
870 AccountId::from("SIM-001"),
871 UUID4::new(),
872 UnixNanos::from(1),
873 UnixNanos::from(2),
874 );
875 exec_evt_tx
876 .send(ExecutionEvent::Order(OrderEventAny::Submitted(order_event)))
877 .unwrap();
878
879 let order_status = OrderStatusReport::new(
881 AccountId::from("SIM-001"),
882 InstrumentId::from("EUR/USD.SIM"),
883 Some(ClientOrderId::from("O-001")),
884 VenueOrderId::from("V-001"),
885 OrderSide::Buy,
886 OrderType::Market,
887 TimeInForce::Gtc,
888 OrderStatus::Accepted,
889 Quantity::from(100_000),
890 Quantity::from(100_000),
891 UnixNanos::from(1),
892 UnixNanos::from(2),
893 UnixNanos::from(3),
894 None,
895 );
896 exec_evt_tx
897 .send(ExecutionEvent::Report(ExecutionReport::Order(Box::new(
898 order_status,
899 ))))
900 .unwrap();
901
902 let fill = FillReport::new(
904 AccountId::from("SIM-001"),
905 InstrumentId::from("EUR/USD.SIM"),
906 VenueOrderId::from("V-001"),
907 TradeId::from("T-001"),
908 OrderSide::Buy,
909 Quantity::from(100_000),
910 Price::from("1.10000"),
911 Money::from("10 USD"),
912 LiquiditySide::Taker,
913 Some(ClientOrderId::from("O-001")),
914 None,
915 UnixNanos::from(1),
916 UnixNanos::from(2),
917 None,
918 );
919 exec_evt_tx
920 .send(ExecutionEvent::Report(ExecutionReport::Fill(Box::new(
921 fill,
922 ))))
923 .unwrap();
924
925 let position = PositionStatusReport::new(
927 AccountId::from("SIM-001"),
928 InstrumentId::from("EUR/USD.SIM"),
929 PositionSideSpecified::Long,
930 Quantity::from(100_000),
931 UnixNanos::from(1),
932 UnixNanos::from(2),
933 None,
934 Some(PositionId::from("P-001")),
935 None,
936 );
937 exec_evt_tx
938 .send(ExecutionEvent::Report(ExecutionReport::Position(Box::new(
939 position,
940 ))))
941 .unwrap();
942
943 let account_state = AccountState::new(
945 AccountId::from("SIM-001"),
946 AccountType::Cash,
947 vec![],
948 vec![],
949 true,
950 UUID4::new(),
951 UnixNanos::from(1),
952 UnixNanos::from(2),
953 None,
954 );
955 exec_evt_tx
956 .send(ExecutionEvent::Account(account_state))
957 .unwrap();
958
959 tokio::time::sleep(Duration::from_millis(100)).await;
961
962 signal_tx.send(()).unwrap();
964
965 let result = tokio::time::timeout(Duration::from_secs(1), runner_handle).await;
966 assert!(
967 result.is_ok(),
968 "Runner should process all event types and stop cleanly"
969 );
970 }
971
972 #[tokio::test]
973 async fn test_runner_handle_stops_runner() {
974 let (_data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
975 let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
976 let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
977 let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
978 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
979 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
980
981 let mut runner = create_test_runner(
982 time_evt_rx,
983 data_evt_rx,
984 data_cmd_rx,
985 exec_evt_rx,
986 exec_cmd_rx,
987 signal_rx,
988 signal_tx.clone(),
989 );
990
991 let handle = runner.handle();
993
994 let runner_task = tokio::spawn(async move {
995 runner.run().await;
996 });
997
998 handle.stop();
1000
1001 let result = tokio::time::timeout(Duration::from_millis(100), runner_task).await;
1002 assert!(result.is_ok(), "Runner should stop via handle");
1003 }
1004
1005 #[tokio::test]
1006 async fn test_runner_handle_is_cloneable() {
1007 let (signal_tx, _signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
1008 let handle = AsyncRunnerHandle { signal_tx };
1009
1010 let handle2 = handle.clone();
1011
1012 assert!(handle.signal_tx.send(()).is_ok());
1014 assert!(handle2.signal_tx.send(()).is_ok());
1015 }
1016
1017 #[tokio::test]
1018 async fn test_runner_processes_events_before_stop() {
1019 let (data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
1020 let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
1021 let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
1022 let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
1023 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
1024 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
1025
1026 let mut runner = create_test_runner(
1027 time_evt_rx,
1028 data_evt_rx,
1029 data_cmd_rx,
1030 exec_evt_rx,
1031 exec_cmd_rx,
1032 signal_rx,
1033 signal_tx.clone(),
1034 );
1035
1036 let handle = runner.handle();
1037
1038 for _ in 0..10 {
1040 let quote = test_quote();
1041 data_evt_tx
1042 .send(DataEvent::Data(Data::Quote(quote)))
1043 .unwrap();
1044 }
1045
1046 let runner_task = tokio::spawn(async move {
1047 runner.run().await;
1048 });
1049
1050 tokio::time::sleep(Duration::from_millis(50)).await;
1052
1053 handle.stop();
1055
1056 let result = tokio::time::timeout(Duration::from_millis(200), runner_task).await;
1057 assert!(result.is_ok(), "Runner should process events and stop");
1058 }
1059}