1use std::{fmt::Debug, sync::Arc};
17
18use nautilus_common::{
19 messages::{
20 DataEvent, ExecutionEvent, ExecutionReport, data::DataCommand, execution::TradingCommand,
21 },
22 msgbus::{self, switchboard::MessagingSwitchboard},
23 runner::{
24 DataCommandSender, TimeEventSender, TradingCommandSender, set_data_cmd_sender,
25 set_data_event_sender, set_exec_cmd_sender, set_exec_event_sender, set_time_event_sender,
26 },
27 timer::TimeEventHandlerV2,
28};
29use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
30
31#[derive(Debug)]
33pub struct AsyncDataCommandSender {
34 cmd_tx: UnboundedSender<DataCommand>,
35}
36
37impl AsyncDataCommandSender {
38 #[must_use]
39 pub const fn new(cmd_tx: UnboundedSender<DataCommand>) -> Self {
40 Self { cmd_tx }
41 }
42}
43
44impl DataCommandSender for AsyncDataCommandSender {
45 fn execute(&self, command: DataCommand) {
46 if let Err(e) = self.cmd_tx.send(command) {
47 log::error!("Failed to send data command: {e}");
48 }
49 }
50}
51
52#[derive(Debug, Clone)]
54pub struct AsyncTimeEventSender {
55 time_tx: UnboundedSender<TimeEventHandlerV2>,
56}
57
58impl AsyncTimeEventSender {
59 #[must_use]
60 pub const fn new(time_tx: UnboundedSender<TimeEventHandlerV2>) -> Self {
61 Self { time_tx }
62 }
63
64 #[must_use]
69 pub fn get_channel_sender(&self) -> UnboundedSender<TimeEventHandlerV2> {
70 self.time_tx.clone()
71 }
72}
73
74impl TimeEventSender for AsyncTimeEventSender {
75 fn send(&self, handler: TimeEventHandlerV2) {
76 if let Err(e) = self.time_tx.send(handler) {
77 log::error!("Failed to send time event handler: {e}");
78 }
79 }
80}
81
82#[derive(Debug)]
84pub struct AsyncTradingCommandSender {
85 cmd_tx: UnboundedSender<TradingCommand>,
86}
87
88impl AsyncTradingCommandSender {
89 #[must_use]
90 pub const fn new(cmd_tx: UnboundedSender<TradingCommand>) -> Self {
91 Self { cmd_tx }
92 }
93}
94
95impl TradingCommandSender for AsyncTradingCommandSender {
96 fn execute(&self, command: TradingCommand) {
97 if let Err(e) = self.cmd_tx.send(command) {
98 log::error!("Failed to send trading command: {e}");
99 }
100 }
101}
102
103pub trait Runner {
104 fn run(&mut self);
105}
106
107pub struct AsyncRunner {
108 time_evt_rx: UnboundedReceiver<TimeEventHandlerV2>,
109 data_evt_rx: UnboundedReceiver<DataEvent>,
110 data_cmd_rx: UnboundedReceiver<DataCommand>,
111 exec_evt_rx: UnboundedReceiver<ExecutionEvent>,
112 exec_cmd_rx: UnboundedReceiver<TradingCommand>,
113 signal_rx: UnboundedReceiver<()>,
114 signal_tx: UnboundedSender<()>,
115}
116
117impl Default for AsyncRunner {
118 fn default() -> Self {
119 Self::new()
120 }
121}
122
123impl Debug for AsyncRunner {
124 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125 f.debug_struct(stringify!(AsyncRunner)).finish()
126 }
127}
128
129impl AsyncRunner {
130 #[must_use]
131 pub fn new() -> Self {
132 let (time_evt_tx, time_evt_rx) =
133 tokio::sync::mpsc::unbounded_channel::<TimeEventHandlerV2>();
134 let (data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
135 let (data_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
136 let (exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
137 let (exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
138 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
139
140 set_time_event_sender(Arc::new(AsyncTimeEventSender::new(time_evt_tx)));
141 set_data_event_sender(data_evt_tx);
142 set_data_cmd_sender(Arc::new(AsyncDataCommandSender::new(data_cmd_tx)));
143 set_exec_event_sender(exec_evt_tx);
144 set_exec_cmd_sender(Arc::new(AsyncTradingCommandSender::new(exec_cmd_tx)));
145
146 Self {
147 time_evt_rx,
148 data_evt_rx,
149 data_cmd_rx,
150 exec_evt_rx,
151 exec_cmd_rx,
152 signal_rx,
153 signal_tx,
154 }
155 }
156
157 pub fn stop(&self) {
159 if let Err(e) = self.signal_tx.send(()) {
160 log::error!("Failed to send shutdown signal: {e}");
161 }
162 }
163}
164
165impl AsyncRunner {
166 pub async fn run(&mut self) {
171 log::info!("Starting AsyncRunner");
172
173 let data_engine_process = MessagingSwitchboard::data_engine_process();
174 let data_engine_response = MessagingSwitchboard::data_engine_response();
175 let data_engine_execute = MessagingSwitchboard::data_engine_execute();
176 let exec_engine_process = MessagingSwitchboard::exec_engine_process();
177 let exec_engine_execute = MessagingSwitchboard::exec_engine_execute();
178
179 loop {
180 tokio::select! {
181 Some(event) = self.data_evt_rx.recv() => {
182 match event {
183 DataEvent::Data(data) => msgbus::send_any(data_engine_process, &data),
184 DataEvent::Response(resp) => {
185 msgbus::send_any(data_engine_response, &resp);
186 }
187 #[cfg(feature = "defi")]
188 DataEvent::DeFi(data) => msgbus::send_any(data_engine_process, &data),
189 }
190 },
191 Some(handler) = self.time_evt_rx.recv() => {
192 handler.run();
193 },
194 Some(cmd) = self.data_cmd_rx.recv() => {
195 msgbus::send_any(data_engine_execute, &cmd);
196 },
197 Some(event) = self.exec_evt_rx.recv() => {
198 match event {
199 ExecutionEvent::Order(order_event) => {
200 msgbus::send_any(exec_engine_process, &order_event);
201 }
202 ExecutionEvent::Report(report) => {
203 match report {
204 ExecutionReport::OrderStatus(r) => {
205 msgbus::send_any("ExecEngine.reconcile_execution_report".into(), &*r);
206 }
207 ExecutionReport::Fill(r) => {
208 msgbus::send_any("ExecEngine.reconcile_execution_report".into(), &*r);
209 }
210 ExecutionReport::Position(r) => {
211 msgbus::send_any("ExecEngine.reconcile_execution_report".into(), &*r);
212 }
213 ExecutionReport::Mass(r) => {
214 msgbus::send_any("ExecEngine.reconcile_execution_mass_status".into(), &*r);
215 }
216 }
217 }
218 }
219 },
220 Some(cmd) = self.exec_cmd_rx.recv() => {
221 msgbus::send_any(exec_engine_execute, &cmd);
222 },
223 Some(()) = self.signal_rx.recv() => {
224 tracing::info!("AsyncRunner received signal, shutting down");
225 return; },
227 else => return, };
229 }
230 }
231}
232
233#[cfg(test)]
238mod tests {
239 use std::time::Duration;
240
241 use nautilus_common::{
242 messages::{
243 ExecutionEvent, ExecutionReport,
244 data::{SubscribeCommand, SubscribeCustomData},
245 execution::TradingCommand,
246 },
247 timer::{TimeEvent, TimeEventCallback, TimeEventHandlerV2},
248 };
249 use nautilus_core::{UUID4, UnixNanos};
250 use nautilus_model::{
251 data::{Data, quote::QuoteTick},
252 enums::OrderSide,
253 events::{OrderEvent, OrderSubmitted},
254 identifiers::{
255 AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId,
256 },
257 types::{Price, Quantity},
258 };
259 use rstest::rstest;
260 use tokio::sync::mpsc;
261 use ustr::Ustr;
262
263 use super::*;
264
265 fn test_quote() -> QuoteTick {
267 QuoteTick {
268 instrument_id: InstrumentId::from("EUR/USD.SIM"),
269 bid_price: Price::from("1.10000"),
270 ask_price: Price::from("1.10001"),
271 bid_size: Quantity::from(1_000_000),
272 ask_size: Quantity::from(1_000_000),
273 ts_event: UnixNanos::default(),
274 ts_init: UnixNanos::default(),
275 }
276 }
277
278 #[rstest]
279 fn test_async_data_command_sender_creation() {
280 let (tx, _rx) = mpsc::unbounded_channel();
281 let sender = AsyncDataCommandSender::new(tx);
282 assert!(format!("{sender:?}").contains("AsyncDataCommandSender"));
283 }
284
285 #[rstest]
286 fn test_async_time_event_sender_creation() {
287 let (tx, _rx) = mpsc::unbounded_channel();
288 let sender = AsyncTimeEventSender::new(tx);
289 assert!(format!("{sender:?}").contains("AsyncTimeEventSender"));
290 }
291
292 #[rstest]
293 fn test_async_time_event_sender_get_channel() {
294 let (tx, _rx) = mpsc::unbounded_channel();
295 let sender = AsyncTimeEventSender::new(tx);
296 let channel = sender.get_channel_sender();
297
298 let event = TimeEvent::new(
300 Ustr::from("test"),
301 UUID4::new(),
302 UnixNanos::from(1),
303 UnixNanos::from(2),
304 );
305 let callback = TimeEventCallback::from(|_: TimeEvent| {});
306 let handler = TimeEventHandlerV2::new(event, callback);
307
308 assert!(channel.send(handler).is_ok());
309 }
310
311 #[tokio::test]
312 async fn test_async_data_command_sender_execute() {
313 let (tx, mut rx) = mpsc::unbounded_channel();
314 let sender = AsyncDataCommandSender::new(tx);
315
316 let command = DataCommand::Subscribe(SubscribeCommand::Data(SubscribeCustomData {
317 client_id: Some(ClientId::from("TEST")),
318 venue: None,
319 data_type: nautilus_model::data::DataType::new("QuoteTick", None),
320 command_id: UUID4::new(),
321 ts_init: UnixNanos::default(),
322 params: None,
323 }));
324
325 sender.execute(command.clone());
326
327 let received = rx.recv().await.unwrap();
328 match (received, command) {
329 (
330 DataCommand::Subscribe(SubscribeCommand::Data(r)),
331 DataCommand::Subscribe(SubscribeCommand::Data(c)),
332 ) => {
333 assert_eq!(r.client_id, c.client_id);
334 assert_eq!(r.data_type, c.data_type);
335 }
336 _ => panic!("Command mismatch"),
337 }
338 }
339
340 #[tokio::test]
341 async fn test_async_time_event_sender_send() {
342 let (tx, mut rx) = mpsc::unbounded_channel();
343 let sender = AsyncTimeEventSender::new(tx);
344
345 let event = TimeEvent::new(
346 Ustr::from("test"),
347 UUID4::new(),
348 UnixNanos::from(1),
349 UnixNanos::from(2),
350 );
351 let callback = TimeEventCallback::from(|_: TimeEvent| {});
352 let handler = TimeEventHandlerV2::new(event, callback);
353
354 sender.send(handler);
355
356 assert!(rx.recv().await.is_some());
357 }
358
359 #[tokio::test]
360 async fn test_runner_shutdown_signal() {
361 let (_data_tx, data_evt_rx) = mpsc::unbounded_channel::<DataEvent>();
363 let (_cmd_tx, data_cmd_rx) = mpsc::unbounded_channel::<DataCommand>();
364 let (_time_tx, time_evt_rx) = mpsc::unbounded_channel::<TimeEventHandlerV2>();
365 let (_exec_evt_tx, exec_evt_rx) = mpsc::unbounded_channel::<ExecutionEvent>();
366 let (_exec_cmd_tx, exec_cmd_rx) = mpsc::unbounded_channel::<TradingCommand>();
367 let (signal_tx, signal_rx) = mpsc::unbounded_channel::<()>();
368
369 let mut runner = AsyncRunner {
370 data_evt_rx,
371 data_cmd_rx,
372 time_evt_rx,
373 exec_evt_rx,
374 exec_cmd_rx,
375 signal_rx,
376 signal_tx: signal_tx.clone(),
377 };
378
379 let runner_handle = tokio::spawn(async move {
381 runner.run().await;
382 });
383
384 signal_tx.send(()).unwrap();
386
387 let result = tokio::time::timeout(Duration::from_millis(100), runner_handle).await;
389 assert!(result.is_ok(), "Runner should stop on signal");
390 }
391
392 #[tokio::test]
393 async fn test_runner_closes_on_channel_drop() {
394 let (data_tx, data_evt_rx) = mpsc::unbounded_channel::<DataEvent>();
395 let (_cmd_tx, data_cmd_rx) = mpsc::unbounded_channel::<DataCommand>();
396 let (_time_tx, time_evt_rx) = mpsc::unbounded_channel::<TimeEventHandlerV2>();
397 let (_exec_evt_tx, exec_evt_rx) = mpsc::unbounded_channel::<ExecutionEvent>();
398 let (_exec_cmd_tx, exec_cmd_rx) = mpsc::unbounded_channel::<TradingCommand>();
399 let (signal_tx, signal_rx) = mpsc::unbounded_channel::<()>();
400
401 let mut runner = AsyncRunner {
402 data_evt_rx,
403 data_cmd_rx,
404 time_evt_rx,
405 exec_evt_rx,
406 exec_cmd_rx,
407 signal_rx,
408 signal_tx: signal_tx.clone(),
409 };
410
411 let runner_handle = tokio::spawn(async move {
413 runner.run().await;
414 });
415
416 drop(data_tx);
418
419 tokio::time::sleep(Duration::from_millis(50)).await;
421 signal_tx.send(()).ok();
422
423 let result = tokio::time::timeout(Duration::from_millis(200), runner_handle).await;
425 assert!(
426 result.is_ok(),
427 "Runner should stop when channels close or on signal"
428 );
429 }
430
431 #[tokio::test]
432 async fn test_concurrent_event_sending() {
433 let (data_evt_tx, data_evt_rx) = mpsc::unbounded_channel::<DataEvent>();
434 let (_data_cmd_tx, data_cmd_rx) = mpsc::unbounded_channel::<DataCommand>();
435 let (_time_evt_tx, time_evt_rx) = mpsc::unbounded_channel::<TimeEventHandlerV2>();
436 let (_exec_evt_tx, exec_evt_rx) = mpsc::unbounded_channel::<ExecutionEvent>();
437 let (_exec_cmd_tx, exec_cmd_rx) = mpsc::unbounded_channel::<TradingCommand>();
438 let (signal_tx, signal_rx) = mpsc::unbounded_channel::<()>();
439
440 let mut runner = AsyncRunner {
442 time_evt_rx,
443 data_evt_rx,
444 data_cmd_rx,
445 exec_evt_rx,
446 exec_cmd_rx,
447 signal_rx,
448 signal_tx: signal_tx.clone(),
449 };
450
451 let mut handles = vec![];
453 for _ in 0..5 {
454 let tx_clone = data_evt_tx.clone();
455 let handle = tokio::spawn(async move {
456 for _ in 0..20 {
457 let quote = test_quote();
458 tx_clone.send(DataEvent::Data(Data::Quote(quote))).unwrap();
459 tokio::task::yield_now().await;
460 }
461 });
462 handles.push(handle);
463 }
464
465 let runner_handle = tokio::spawn(async move {
467 runner.run().await;
468 });
469
470 for handle in handles {
472 handle.await.unwrap();
473 }
474
475 tokio::time::sleep(Duration::from_millis(50)).await;
477
478 signal_tx.send(()).unwrap();
480
481 let _ = tokio::time::timeout(Duration::from_secs(1), runner_handle).await;
482 }
483
484 #[rstest]
485 #[case(10)]
486 #[case(100)]
487 #[case(1000)]
488 fn test_channel_send_performance(#[case] count: usize) {
489 let (tx, mut rx) = mpsc::unbounded_channel::<DataEvent>();
490 let quote = test_quote();
491
492 for _ in 0..count {
494 tx.send(DataEvent::Data(Data::Quote(quote))).unwrap();
495 }
496
497 let mut received = 0;
499 while rx.try_recv().is_ok() {
500 received += 1;
501 }
502
503 assert_eq!(received, count);
504 }
505
506 #[rstest]
507 fn test_async_trading_command_sender_creation() {
508 let (tx, _rx) = mpsc::unbounded_channel();
509 let sender = AsyncTradingCommandSender::new(tx);
510 assert!(format!("{sender:?}").contains("AsyncTradingCommandSender"));
511 }
512
513 #[tokio::test]
514 async fn test_execution_event_order_channel() {
515 let (tx, mut rx) = mpsc::unbounded_channel::<ExecutionEvent>();
516
517 let event = OrderSubmitted::new(
518 TraderId::from("TRADER-001"),
519 StrategyId::from("S-001"),
520 InstrumentId::from("EUR/USD.SIM"),
521 ClientOrderId::from("O-001"),
522 AccountId::from("SIM-001"),
523 UUID4::new(),
524 UnixNanos::from(1),
525 UnixNanos::from(2),
526 );
527
528 tx.send(ExecutionEvent::Order(
529 nautilus_model::events::OrderEventAny::Submitted(event),
530 ))
531 .unwrap();
532
533 let received = rx.recv().await.unwrap();
534 match received {
535 ExecutionEvent::Order(nautilus_model::events::OrderEventAny::Submitted(e)) => {
536 assert_eq!(e.client_order_id(), ClientOrderId::from("O-001"));
537 }
538 _ => panic!("Expected OrderSubmitted event"),
539 }
540 }
541
542 #[tokio::test]
543 async fn test_execution_report_order_status_channel() {
544 use nautilus_model::{
545 enums::{OrderStatus, OrderType, TimeInForce},
546 reports::OrderStatusReport,
547 };
548
549 let (tx, mut rx) = mpsc::unbounded_channel::<ExecutionEvent>();
550
551 let report = OrderStatusReport::new(
552 AccountId::from("SIM-001"),
553 InstrumentId::from("EUR/USD.SIM"),
554 Some(ClientOrderId::from("O-001")),
555 VenueOrderId::from("V-001"),
556 OrderSide::Buy,
557 OrderType::Market,
558 TimeInForce::Gtc,
559 OrderStatus::Accepted,
560 Quantity::from(100_000),
561 Quantity::from(100_000),
562 UnixNanos::from(1),
563 UnixNanos::from(2),
564 UnixNanos::from(3),
565 None,
566 );
567
568 tx.send(ExecutionEvent::Report(ExecutionReport::OrderStatus(
569 Box::new(report),
570 )))
571 .unwrap();
572
573 let received = rx.recv().await.unwrap();
574 match received {
575 ExecutionEvent::Report(ExecutionReport::OrderStatus(r)) => {
576 assert_eq!(r.venue_order_id.as_str(), "V-001");
577 assert_eq!(r.order_status, OrderStatus::Accepted);
578 }
579 _ => panic!("Expected OrderStatusReport"),
580 }
581 }
582}