1use std::{
32 collections::{HashMap, VecDeque},
33 fmt::Debug,
34 sync::{
35 Arc,
36 atomic::{AtomicBool, Ordering},
37 },
38 time::Duration,
39};
40
41use bytes::Bytes;
42use futures::stream::Stream;
43use nautilus_common::{
44 live::get_runtime,
45 logging::{log_task_error, log_task_started, log_task_stopped},
46 msgbus::{
47 BusMessage,
48 database::{DatabaseConfig, MessageBusConfig, MessageBusDatabaseAdapter},
49 switchboard::CLOSE_TOPIC,
50 },
51};
52use nautilus_core::{
53 UUID4,
54 time::{duration_since_unix_epoch, get_atomic_clock_realtime},
55};
56use nautilus_cryptography::providers::install_cryptographic_provider;
57use nautilus_model::identifiers::TraderId;
58use redis::{AsyncCommands, streams};
59use streams::StreamReadOptions;
60use ustr::Ustr;
61
62use super::{REDIS_MINID, REDIS_XTRIM, await_handle};
63use crate::redis::{create_redis_connection, get_stream_key};
64
65const MSGBUS_PUBLISH: &str = "msgbus-publish";
66const MSGBUS_STREAM: &str = "msgbus-stream";
67const MSGBUS_HEARTBEAT: &str = "msgbus-heartbeat";
68const HEARTBEAT_TOPIC: &str = "health:heartbeat";
69const TRIM_BUFFER_SECS: u64 = 60;
70
71type RedisStreamBulk = Vec<HashMap<String, Vec<HashMap<String, redis::Value>>>>;
72
73#[cfg_attr(
74 feature = "python",
75 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.infrastructure")
76)]
77pub struct RedisMessageBusDatabase {
78 pub trader_id: TraderId,
80 pub instance_id: UUID4,
82 pub_tx: tokio::sync::mpsc::UnboundedSender<BusMessage>,
83 pub_handle: Option<tokio::task::JoinHandle<()>>,
84 stream_rx: Option<tokio::sync::mpsc::Receiver<BusMessage>>,
85 stream_handle: Option<tokio::task::JoinHandle<()>>,
86 stream_signal: Arc<AtomicBool>,
87 heartbeat_handle: Option<tokio::task::JoinHandle<()>>,
88 heartbeat_signal: Arc<AtomicBool>,
89}
90
91impl Debug for RedisMessageBusDatabase {
92 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93 f.debug_struct(stringify!(RedisMessageBusDatabase))
94 .field("trader_id", &self.trader_id)
95 .field("instance_id", &self.instance_id)
96 .finish()
97 }
98}
99
100impl MessageBusDatabaseAdapter for RedisMessageBusDatabase {
101 type DatabaseType = Self;
102
103 fn new(
111 trader_id: TraderId,
112 instance_id: UUID4,
113 config: MessageBusConfig,
114 ) -> anyhow::Result<Self> {
115 install_cryptographic_provider();
116
117 let config_clone = config.clone();
118 let db_config = config
119 .database
120 .clone()
121 .ok_or_else(|| anyhow::anyhow!("No database config"))?;
122
123 let (pub_tx, pub_rx) = tokio::sync::mpsc::unbounded_channel::<BusMessage>();
124
125 let pub_handle = Some(get_runtime().spawn(async move {
127 if let Err(e) = publish_messages(pub_rx, trader_id, instance_id, config_clone).await {
128 log_task_error(MSGBUS_PUBLISH, &e);
129 }
130 }));
131
132 let external_streams = config.external_streams.clone().unwrap_or_default();
134 let stream_signal = Arc::new(AtomicBool::new(false));
135 let (stream_rx, stream_handle) = if external_streams.is_empty() {
136 (None, None)
137 } else {
138 let stream_signal_clone = stream_signal.clone();
139 let (stream_tx, stream_rx) = tokio::sync::mpsc::channel::<BusMessage>(100_000);
140 (
141 Some(stream_rx),
142 Some(get_runtime().spawn(async move {
143 if let Err(e) =
144 stream_messages(stream_tx, db_config, external_streams, stream_signal_clone)
145 .await
146 {
147 log_task_error(MSGBUS_STREAM, &e);
148 }
149 })),
150 )
151 };
152
153 let heartbeat_signal = Arc::new(AtomicBool::new(false));
155 let heartbeat_handle = if let Some(heartbeat_interval_secs) = config.heartbeat_interval_secs
156 {
157 let signal = heartbeat_signal.clone();
158 let pub_tx_clone = pub_tx.clone();
159
160 Some(get_runtime().spawn(async move {
161 run_heartbeat(heartbeat_interval_secs, signal, pub_tx_clone).await;
162 }))
163 } else {
164 None
165 };
166
167 Ok(Self {
168 trader_id,
169 instance_id,
170 pub_tx,
171 pub_handle,
172 stream_rx,
173 stream_handle,
174 stream_signal,
175 heartbeat_handle,
176 heartbeat_signal,
177 })
178 }
179
180 fn is_closed(&self) -> bool {
182 self.pub_tx.is_closed()
183 }
184
185 fn publish(&self, topic: Ustr, payload: Bytes) {
187 let msg = BusMessage::new(topic, payload);
188 if let Err(e) = self.pub_tx.send(msg) {
189 log::error!("Failed to send message: {e}");
190 }
191 }
192
193 fn close(&mut self) {
195 log::debug!("Closing");
196
197 self.stream_signal.store(true, Ordering::Relaxed);
198 self.heartbeat_signal.store(true, Ordering::Relaxed);
199
200 if !self.pub_tx.is_closed() {
201 let msg = BusMessage::new_close();
202
203 if let Err(e) = self.pub_tx.send(msg) {
204 log::error!("Failed to send close message: {e:?}");
205 }
206 }
207
208 tokio::task::block_in_place(|| {
210 get_runtime().block_on(async {
211 self.close_async().await;
212 });
213 });
214
215 log::debug!("Closed");
216 }
217}
218
219impl RedisMessageBusDatabase {
220 pub fn get_stream_receiver(
226 &mut self,
227 ) -> anyhow::Result<tokio::sync::mpsc::Receiver<BusMessage>> {
228 self.stream_rx
229 .take()
230 .ok_or_else(|| anyhow::anyhow!("Stream receiver already taken"))
231 }
232
233 pub fn stream(
235 mut stream_rx: tokio::sync::mpsc::Receiver<BusMessage>,
236 ) -> impl Stream<Item = BusMessage> + 'static {
237 async_stream::stream! {
238 while let Some(msg) = stream_rx.recv().await {
239 yield msg;
240 }
241 }
242 }
243
244 pub async fn close_async(&mut self) {
245 await_handle(self.pub_handle.take(), MSGBUS_PUBLISH).await;
246 await_handle(self.stream_handle.take(), MSGBUS_STREAM).await;
247 await_handle(self.heartbeat_handle.take(), MSGBUS_HEARTBEAT).await;
248 }
249}
250
251pub async fn publish_messages(
260 mut rx: tokio::sync::mpsc::UnboundedReceiver<BusMessage>,
261 trader_id: TraderId,
262 instance_id: UUID4,
263 config: MessageBusConfig,
264) -> anyhow::Result<()> {
265 log_task_started(MSGBUS_PUBLISH);
266
267 let db_config = config
268 .database
269 .as_ref()
270 .ok_or_else(|| anyhow::anyhow!("No database config"))?;
271 let mut con = create_redis_connection(MSGBUS_PUBLISH, db_config.clone()).await?;
272 let stream_key = get_stream_key(trader_id, instance_id, &config);
273
274 let autotrim_duration = config
276 .autotrim_mins
277 .filter(|&mins| mins > 0)
278 .map(|mins| Duration::from_secs(u64::from(mins) * 60));
279 let mut last_trim_index: HashMap<String, usize> = HashMap::new();
280
281 let mut buffer: VecDeque<BusMessage> = VecDeque::new();
283 let buffer_interval = Duration::from_millis(u64::from(config.buffer_interval_ms.unwrap_or(0)));
284
285 let flush_timer = tokio::time::sleep(buffer_interval);
289 tokio::pin!(flush_timer);
290
291 loop {
292 tokio::select! {
293 maybe_msg = rx.recv() => {
294 if let Some(msg) = maybe_msg {
295 if msg.topic == CLOSE_TOPIC {
296 log::debug!("Received close message");
297 if !buffer.is_empty() {
299 drain_buffer(
300 &mut con,
301 &stream_key,
302 config.stream_per_topic,
303 autotrim_duration,
304 &mut last_trim_index,
305 &mut buffer,
306 ).await?;
307 }
308 break;
309 }
310
311 buffer.push_back(msg);
312
313 if buffer_interval.is_zero() {
314 drain_buffer(
316 &mut con,
317 &stream_key,
318 config.stream_per_topic,
319 autotrim_duration,
320 &mut last_trim_index,
321 &mut buffer,
322 ).await?;
323 }
324 } else {
325 log::debug!("Channel hung up");
326 break;
327 }
328 }
329 () = &mut flush_timer, if !buffer_interval.is_zero() => {
332 if !buffer.is_empty() {
333 drain_buffer(
334 &mut con,
335 &stream_key,
336 config.stream_per_topic,
337 autotrim_duration,
338 &mut last_trim_index,
339 &mut buffer,
340 ).await?;
341 }
342
343 flush_timer.as_mut().reset(tokio::time::Instant::now() + buffer_interval);
345 }
346 }
347 }
348
349 if !buffer.is_empty() {
351 drain_buffer(
352 &mut con,
353 &stream_key,
354 config.stream_per_topic,
355 autotrim_duration,
356 &mut last_trim_index,
357 &mut buffer,
358 )
359 .await?;
360 }
361
362 log_task_stopped(MSGBUS_PUBLISH);
363 Ok(())
364}
365
366async fn drain_buffer(
367 conn: &mut redis::aio::ConnectionManager,
368 stream_key: &str,
369 stream_per_topic: bool,
370 autotrim_duration: Option<Duration>,
371 last_trim_index: &mut HashMap<String, usize>,
372 buffer: &mut VecDeque<BusMessage>,
373) -> anyhow::Result<()> {
374 let mut pipe = redis::pipe();
375 pipe.atomic();
376
377 for msg in buffer.drain(..) {
378 let items: Vec<(&str, &[u8])> = vec![
379 ("topic", msg.topic.as_ref()),
380 ("payload", msg.payload.as_ref()),
381 ];
382 let stream_key = if stream_per_topic {
383 format!("{stream_key}:{}", &msg.topic)
384 } else {
385 stream_key.to_string()
386 };
387 pipe.xadd(&stream_key, "*", &items);
388
389 if autotrim_duration.is_none() {
390 continue; }
392
393 let last_trim_ms = last_trim_index.entry(stream_key.clone()).or_insert(0); let unix_duration_now = duration_since_unix_epoch();
396 let trim_buffer = Duration::from_secs(TRIM_BUFFER_SECS);
397
398 if *last_trim_ms < (unix_duration_now - trim_buffer).as_millis() as usize {
400 let min_timestamp_ms =
401 (unix_duration_now - autotrim_duration.unwrap()).as_millis() as usize;
402 let result: Result<(), redis::RedisError> = redis::cmd(REDIS_XTRIM)
403 .arg(stream_key.clone())
404 .arg(REDIS_MINID)
405 .arg(min_timestamp_ms)
406 .query_async(conn)
407 .await;
408
409 if let Err(e) = result {
410 log::error!("Error trimming stream '{stream_key}': {e}");
411 } else {
412 last_trim_index.insert(stream_key.clone(), unix_duration_now.as_millis() as usize);
413 }
414 }
415 }
416
417 pipe.query_async(conn).await.map_err(anyhow::Error::from)
418}
419
420pub async fn stream_messages(
428 tx: tokio::sync::mpsc::Sender<BusMessage>,
429 config: DatabaseConfig,
430 stream_keys: Vec<String>,
431 stream_signal: Arc<AtomicBool>,
432) -> anyhow::Result<()> {
433 log_task_started(MSGBUS_STREAM);
434
435 let mut con = create_redis_connection(MSGBUS_STREAM, config).await?;
436
437 let stream_keys = &stream_keys
438 .iter()
439 .map(String::as_str)
440 .collect::<Vec<&str>>();
441
442 log::debug!("Listening to streams: [{}]", stream_keys.join(", "));
443
444 let clock = get_atomic_clock_realtime();
446 let timestamp_ms = clock.get_time_ms();
447 let initial_id = timestamp_ms.to_string();
448
449 let mut last_ids: HashMap<String, String> = stream_keys
450 .iter()
451 .map(|&key| (key.to_string(), initial_id.clone()))
452 .collect();
453
454 let opts = StreamReadOptions::default().block(100);
455
456 'outer: loop {
457 if stream_signal.load(Ordering::Relaxed) {
458 log::debug!("Received streaming terminate signal");
459 break;
460 }
461
462 let ids: Vec<String> = stream_keys
463 .iter()
464 .map(|&key| last_ids[key].clone())
465 .collect();
466 let id_refs: Vec<&str> = ids.iter().map(String::as_str).collect();
467
468 let result: Result<RedisStreamBulk, _> =
469 con.xread_options(&[&stream_keys], &[&id_refs], &opts).await;
470 match result {
471 Ok(stream_bulk) => {
472 if stream_bulk.is_empty() {
473 continue;
475 }
476 for entry in &stream_bulk {
477 for (stream_key, stream_msgs) in entry {
478 for stream_msg in stream_msgs {
479 for (id, array) in stream_msg {
480 last_ids.insert(stream_key.clone(), id.clone());
481
482 match decode_bus_message(array) {
483 Ok(msg) => {
484 if let Err(e) = tx.send(msg).await {
485 log::debug!("Channel closed: {e:?}");
486 break 'outer; }
488 }
489 Err(e) => {
490 log::error!("{e:?}");
491 continue;
492 }
493 }
494 }
495 }
496 }
497 }
498 }
499 Err(e) => {
500 anyhow::bail!("Error reading from stream: {e:?}");
501 }
502 }
503 }
504
505 log_task_stopped(MSGBUS_STREAM);
506 Ok(())
507}
508
509fn decode_bus_message(stream_msg: &redis::Value) -> anyhow::Result<BusMessage> {
518 if let redis::Value::Array(stream_msg) = stream_msg {
519 if stream_msg.len() < 4 {
520 anyhow::bail!("Invalid stream message format: {stream_msg:?}");
521 }
522
523 let topic = match &stream_msg[1] {
524 redis::Value::BulkString(bytes) => match String::from_utf8(bytes.clone()) {
525 Ok(topic) => topic,
526 Err(e) => anyhow::bail!("Error parsing topic: {e}"),
527 },
528 _ => {
529 anyhow::bail!("Invalid topic format: {stream_msg:?}");
530 }
531 };
532
533 let payload = match &stream_msg[3] {
534 redis::Value::BulkString(bytes) => Bytes::copy_from_slice(bytes),
535 _ => {
536 anyhow::bail!("Invalid payload format: {stream_msg:?}");
537 }
538 };
539
540 Ok(BusMessage::with_str_topic(topic, payload))
541 } else {
542 anyhow::bail!("Invalid stream message format: {stream_msg:?}")
543 }
544}
545
546async fn run_heartbeat(
547 heartbeat_interval_secs: u16,
548 signal: Arc<AtomicBool>,
549 pub_tx: tokio::sync::mpsc::UnboundedSender<BusMessage>,
550) {
551 log_task_started("heartbeat");
552 log::debug!("Heartbeat at {heartbeat_interval_secs} second intervals");
553
554 let heartbeat_interval = Duration::from_secs(u64::from(heartbeat_interval_secs));
555 let heartbeat_timer = tokio::time::interval(heartbeat_interval);
556
557 let check_interval = Duration::from_millis(100);
558 let check_timer = tokio::time::interval(check_interval);
559
560 tokio::pin!(heartbeat_timer);
561 tokio::pin!(check_timer);
562
563 loop {
564 if signal.load(Ordering::Relaxed) {
565 log::debug!("Received heartbeat terminate signal");
566 break;
567 }
568
569 tokio::select! {
570 _ = heartbeat_timer.tick() => {
571 let heartbeat = create_heartbeat_msg();
572 if let Err(e) = pub_tx.send(heartbeat) {
573 log::debug!("Error sending heartbeat: {e}");
575 }
576 },
577 _ = check_timer.tick() => {}
578 }
579 }
580
581 log_task_stopped("heartbeat");
582}
583
584fn create_heartbeat_msg() -> BusMessage {
585 let payload = Bytes::from(chrono::Utc::now().to_rfc3339().into_bytes());
586 BusMessage::with_str_topic(HEARTBEAT_TOPIC, payload)
587}
588
589#[cfg(test)]
590mod tests {
591 use redis::Value;
592 use rstest::*;
593
594 use super::*;
595
596 #[rstest]
597 fn test_decode_bus_message_valid() {
598 let stream_msg = Value::Array(vec![
599 Value::BulkString(b"0".to_vec()),
600 Value::BulkString(b"topic1".to_vec()),
601 Value::BulkString(b"unused".to_vec()),
602 Value::BulkString(b"data1".to_vec()),
603 ]);
604
605 let result = decode_bus_message(&stream_msg);
606 assert!(result.is_ok());
607 let msg = result.unwrap();
608 assert_eq!(msg.topic, "topic1");
609 assert_eq!(msg.payload, Bytes::from("data1"));
610 }
611
612 #[rstest]
613 fn test_decode_bus_message_missing_fields() {
614 let stream_msg = Value::Array(vec![
615 Value::BulkString(b"0".to_vec()),
616 Value::BulkString(b"topic1".to_vec()),
617 ]);
618
619 let result = decode_bus_message(&stream_msg);
620 assert!(result.is_err());
621 assert_eq!(
622 format!("{}", result.unwrap_err()),
623 "Invalid stream message format: [bulk-string('\"0\"'), bulk-string('\"topic1\"')]"
624 );
625 }
626
627 #[rstest]
628 fn test_decode_bus_message_invalid_topic_format() {
629 let stream_msg = Value::Array(vec![
630 Value::BulkString(b"0".to_vec()),
631 Value::Int(42), Value::BulkString(b"unused".to_vec()),
633 Value::BulkString(b"data1".to_vec()),
634 ]);
635
636 let result = decode_bus_message(&stream_msg);
637 assert!(result.is_err());
638 assert_eq!(
639 format!("{}", result.unwrap_err()),
640 "Invalid topic format: [bulk-string('\"0\"'), int(42), bulk-string('\"unused\"'), bulk-string('\"data1\"')]"
641 );
642 }
643
644 #[rstest]
645 fn test_decode_bus_message_invalid_payload_format() {
646 let stream_msg = Value::Array(vec![
647 Value::BulkString(b"0".to_vec()),
648 Value::BulkString(b"topic1".to_vec()),
649 Value::BulkString(b"unused".to_vec()),
650 Value::Int(42), ]);
652
653 let result = decode_bus_message(&stream_msg);
654 assert!(result.is_err());
655 assert_eq!(
656 format!("{}", result.unwrap_err()),
657 "Invalid payload format: [bulk-string('\"0\"'), bulk-string('\"topic1\"'), bulk-string('\"unused\"'), int(42)]"
658 );
659 }
660
661 #[rstest]
662 fn test_decode_bus_message_invalid_stream_msg_format() {
663 let stream_msg = Value::BulkString(b"not an array".to_vec());
664
665 let result = decode_bus_message(&stream_msg);
666 assert!(result.is_err());
667 assert_eq!(
668 format!("{}", result.unwrap_err()),
669 "Invalid stream message format: bulk-string('\"not an array\"')"
670 );
671 }
672}
673
674#[cfg(target_os = "linux")] #[cfg(test)]
676mod serial_tests {
677 use nautilus_common::testing::wait_until_async;
678 use redis::aio::ConnectionManager;
679 use rstest::*;
680
681 use super::*;
682 use crate::redis::flush_redis;
683
684 #[fixture]
685 async fn redis_connection() -> ConnectionManager {
686 let config = DatabaseConfig::default();
687 let mut con = create_redis_connection(MSGBUS_STREAM, config)
688 .await
689 .unwrap();
690 flush_redis(&mut con).await.unwrap();
691 con
692 }
693
694 #[rstest]
695 #[tokio::test(flavor = "multi_thread")]
696 async fn test_stream_messages_terminate_signal(#[future] redis_connection: ConnectionManager) {
697 let mut con = redis_connection.await;
698 let (tx, mut rx) = tokio::sync::mpsc::channel::<BusMessage>(100);
699
700 let trader_id = TraderId::from("tester-001");
701 let instance_id = UUID4::new();
702 let config = MessageBusConfig {
703 database: Some(DatabaseConfig::default()),
704 ..Default::default()
705 };
706
707 let stream_key = get_stream_key(trader_id, instance_id, &config);
708 let external_streams = vec![stream_key.clone()];
709 let stream_signal = Arc::new(AtomicBool::new(false));
710 let stream_signal_clone = stream_signal.clone();
711
712 let handle = tokio::spawn(async move {
714 stream_messages(
715 tx,
716 DatabaseConfig::default(),
717 external_streams,
718 stream_signal_clone,
719 )
720 .await
721 .unwrap();
722 });
723
724 stream_signal.store(true, Ordering::Relaxed);
725 let _ = rx.recv().await; rx.close();
729 handle.await.unwrap();
730 flush_redis(&mut con).await.unwrap();
731 }
732
733 #[rstest]
734 #[tokio::test(flavor = "multi_thread")]
735 async fn test_stream_messages_when_receiver_closed(
736 #[future] redis_connection: ConnectionManager,
737 ) {
738 let mut con = redis_connection.await;
739 let (tx, mut rx) = tokio::sync::mpsc::channel::<BusMessage>(100);
740
741 let trader_id = TraderId::from("tester-001");
742 let instance_id = UUID4::new();
743 let config = MessageBusConfig {
744 database: Some(DatabaseConfig::default()),
745 ..Default::default()
746 };
747
748 let stream_key = get_stream_key(trader_id, instance_id, &config);
749 let external_streams = vec![stream_key.clone()];
750 let stream_signal = Arc::new(AtomicBool::new(false));
751 let stream_signal_clone = stream_signal.clone();
752
753 let clock = get_atomic_clock_realtime();
756 let future_id = (clock.get_time_ms() + 1_000_000).to_string();
757
758 let _: () = con
760 .xadd(
761 stream_key,
762 future_id,
763 &[("topic", "topic1"), ("payload", "data1")],
764 )
765 .await
766 .unwrap();
767
768 rx.close();
770
771 let handle = tokio::spawn(async move {
773 stream_messages(
774 tx,
775 DatabaseConfig::default(),
776 external_streams,
777 stream_signal_clone,
778 )
779 .await
780 .unwrap();
781 });
782
783 handle.await.unwrap();
785 flush_redis(&mut con).await.unwrap();
786 }
787
788 #[rstest]
789 #[tokio::test(flavor = "multi_thread")]
790 async fn test_stream_messages(#[future] redis_connection: ConnectionManager) {
791 let mut con = redis_connection.await;
792 let (tx, mut rx) = tokio::sync::mpsc::channel::<BusMessage>(100);
793
794 let trader_id = TraderId::from("tester-001");
795 let instance_id = UUID4::new();
796 let config = MessageBusConfig {
797 database: Some(DatabaseConfig::default()),
798 ..Default::default()
799 };
800
801 let stream_key = get_stream_key(trader_id, instance_id, &config);
802 let external_streams = vec![stream_key.clone()];
803 let stream_signal = Arc::new(AtomicBool::new(false));
804 let stream_signal_clone = stream_signal.clone();
805
806 let clock = get_atomic_clock_realtime();
809 let future_id = (clock.get_time_ms() + 1_000_000).to_string();
810
811 let _: () = con
813 .xadd(
814 stream_key,
815 future_id,
816 &[("topic", "topic1"), ("payload", "data1")],
817 )
818 .await
819 .unwrap();
820
821 let handle = tokio::spawn(async move {
823 stream_messages(
824 tx,
825 DatabaseConfig::default(),
826 external_streams,
827 stream_signal_clone,
828 )
829 .await
830 .unwrap();
831 });
832
833 let msg = rx.recv().await.unwrap();
835 assert_eq!(msg.topic, "topic1");
836 assert_eq!(msg.payload, Bytes::from("data1"));
837
838 rx.close();
840 stream_signal.store(true, Ordering::Relaxed);
841 handle.await.unwrap();
842 flush_redis(&mut con).await.unwrap();
843 }
844
845 #[rstest]
846 #[tokio::test(flavor = "multi_thread")]
847 async fn test_publish_messages(#[future] redis_connection: ConnectionManager) {
848 let mut con = redis_connection.await;
849 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<BusMessage>();
850
851 let trader_id = TraderId::from("tester-001");
852 let instance_id = UUID4::new();
853 let config = MessageBusConfig {
854 database: Some(DatabaseConfig::default()),
855 stream_per_topic: false,
856 ..Default::default()
857 };
858 let stream_key = get_stream_key(trader_id, instance_id, &config);
859
860 let handle = tokio::spawn(async move {
862 publish_messages(rx, trader_id, instance_id, config)
863 .await
864 .unwrap();
865 });
866
867 let msg = BusMessage::with_str_topic("test_topic", Bytes::from("test_payload"));
869 tx.send(msg).unwrap();
870
871 wait_until_async(
873 || {
874 let mut con = con.clone();
875 let stream_key = stream_key.clone();
876 async move {
877 let messages: RedisStreamBulk =
878 con.xread(&[&stream_key], &["0"]).await.unwrap();
879 !messages.is_empty()
880 }
881 },
882 Duration::from_secs(3),
883 )
884 .await;
885
886 let messages: RedisStreamBulk = con.xread(&[&stream_key], &["0"]).await.unwrap();
888 assert_eq!(messages.len(), 1);
889 let stream_msgs = messages[0].get(&stream_key).unwrap();
890 let stream_msg_array = &stream_msgs[0].values().next().unwrap();
891 let decoded_message = decode_bus_message(stream_msg_array).unwrap();
892 assert_eq!(decoded_message.topic, "test_topic");
893 assert_eq!(decoded_message.payload, Bytes::from("test_payload"));
894
895 let msg = BusMessage::new_close();
897 tx.send(msg).unwrap();
898
899 handle.await.unwrap();
901 flush_redis(&mut con).await.unwrap();
902 }
903
904 #[rstest]
905 #[tokio::test(flavor = "multi_thread")]
906 async fn test_stream_messages_multiple_streams(#[future] redis_connection: ConnectionManager) {
907 let mut con = redis_connection.await;
908 let (tx, mut rx) = tokio::sync::mpsc::channel::<BusMessage>(100);
909
910 let stream_key1 = "test:stream:1".to_string();
912 let stream_key2 = "test:stream:2".to_string();
913 let external_streams = vec![stream_key1.clone(), stream_key2.clone()];
914 let stream_signal = Arc::new(AtomicBool::new(false));
915 let stream_signal_clone = stream_signal.clone();
916
917 let clock = get_atomic_clock_realtime();
918 let base_id = clock.get_time_ms() + 1_000_000;
919
920 let handle = tokio::spawn(async move {
922 stream_messages(
923 tx,
924 DatabaseConfig::default(),
925 external_streams,
926 stream_signal_clone,
927 )
928 .await
929 .unwrap();
930 });
931
932 tokio::time::sleep(Duration::from_millis(200)).await;
933
934 let _: () = con
936 .xadd(
937 &stream_key1,
938 format!("{}", base_id + 100),
939 &[("topic", "stream1-first"), ("payload", "data")],
940 )
941 .await
942 .unwrap();
943
944 let msg = tokio::time::timeout(Duration::from_secs(2), rx.recv())
945 .await
946 .expect("Stream 1 message should be received")
947 .unwrap();
948 assert_eq!(msg.topic, "stream1-first");
949
950 let _: () = con
952 .xadd(
953 &stream_key2,
954 format!("{}", base_id + 50),
955 &[("topic", "stream2-second"), ("payload", "data")],
956 )
957 .await
958 .unwrap();
959
960 let msg = tokio::time::timeout(Duration::from_secs(2), rx.recv())
961 .await
962 .expect("Stream 2 message should be received")
963 .unwrap();
964 assert_eq!(msg.topic, "stream2-second");
965
966 rx.close();
968 stream_signal.store(true, Ordering::Relaxed);
969 handle.await.unwrap();
970 flush_redis(&mut con).await.unwrap();
971 }
972
973 #[rstest]
974 #[tokio::test(flavor = "multi_thread")]
975 async fn test_stream_messages_interleaved_at_different_rates(
976 #[future] redis_connection: ConnectionManager,
977 ) {
978 let mut con = redis_connection.await;
979 let (tx, mut rx) = tokio::sync::mpsc::channel::<BusMessage>(100);
980
981 let stream_key1 = "test:stream:interleaved:1".to_string();
983 let stream_key2 = "test:stream:interleaved:2".to_string();
984 let stream_key3 = "test:stream:interleaved:3".to_string();
985 let external_streams = vec![
986 stream_key1.clone(),
987 stream_key2.clone(),
988 stream_key3.clone(),
989 ];
990 let stream_signal = Arc::new(AtomicBool::new(false));
991 let stream_signal_clone = stream_signal.clone();
992
993 let clock = get_atomic_clock_realtime();
994 let base_id = clock.get_time_ms() + 1_000_000;
995
996 let handle = tokio::spawn(async move {
997 stream_messages(
998 tx,
999 DatabaseConfig::default(),
1000 external_streams,
1001 stream_signal_clone,
1002 )
1003 .await
1004 .unwrap();
1005 });
1006
1007 tokio::time::sleep(Duration::from_millis(200)).await;
1008
1009 let _: () = con
1011 .xadd(
1012 &stream_key1,
1013 format!("{}", base_id + 100),
1014 &[("topic", "s1m1"), ("payload", "data")],
1015 )
1016 .await
1017 .unwrap();
1018 let msg = tokio::time::timeout(Duration::from_secs(2), rx.recv())
1019 .await
1020 .expect("Stream 1 message should be received")
1021 .unwrap();
1022 assert_eq!(msg.topic, "s1m1");
1023
1024 let _: () = con
1026 .xadd(
1027 &stream_key2,
1028 format!("{}", base_id + 50),
1029 &[("topic", "s2m1"), ("payload", "data")],
1030 )
1031 .await
1032 .unwrap();
1033 let msg = tokio::time::timeout(Duration::from_secs(2), rx.recv())
1034 .await
1035 .expect("Stream 2 message should be received")
1036 .unwrap();
1037 assert_eq!(msg.topic, "s2m1");
1038
1039 let _: () = con
1041 .xadd(
1042 &stream_key3,
1043 format!("{}", base_id + 25),
1044 &[("topic", "s3m1"), ("payload", "data")],
1045 )
1046 .await
1047 .unwrap();
1048 let msg = tokio::time::timeout(Duration::from_secs(2), rx.recv())
1049 .await
1050 .expect("Stream 3 message should be received")
1051 .unwrap();
1052 assert_eq!(msg.topic, "s3m1");
1053
1054 rx.close();
1056 stream_signal.store(true, Ordering::Relaxed);
1057 handle.await.unwrap();
1058 flush_redis(&mut con).await.unwrap();
1059 }
1060
1061 #[rstest]
1062 #[tokio::test(flavor = "multi_thread")]
1063 async fn test_close() {
1064 let trader_id = TraderId::from("tester-001");
1065 let instance_id = UUID4::new();
1066 let config = MessageBusConfig {
1067 database: Some(DatabaseConfig::default()),
1068 ..Default::default()
1069 };
1070
1071 let mut db = RedisMessageBusDatabase::new(trader_id, instance_id, config).unwrap();
1072
1073 db.close();
1075 }
1076
1077 #[rstest]
1078 #[tokio::test(flavor = "multi_thread")]
1079 async fn test_heartbeat_task() {
1080 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<BusMessage>();
1081 let signal = Arc::new(AtomicBool::new(false));
1082
1083 let handle = tokio::spawn(run_heartbeat(1, signal.clone(), tx));
1085
1086 tokio::time::sleep(Duration::from_secs(2)).await;
1088
1089 signal.store(true, Ordering::Relaxed);
1091 handle.await.unwrap();
1092
1093 let mut heartbeats: Vec<BusMessage> = Vec::new();
1095 while let Ok(hb) = rx.try_recv() {
1096 heartbeats.push(hb);
1097 }
1098
1099 assert!(!heartbeats.is_empty());
1100
1101 for hb in heartbeats {
1102 assert_eq!(hb.topic, HEARTBEAT_TOPIC);
1103 }
1104 }
1105}