Skip to main content

nautilus_infrastructure/redis/
msgbus.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Redis-backed message bus database for the system.
17//!
18//! # Architecture
19//!
20//! Runs background tasks on `get_runtime()` for publishing, stream reading,
21//! and heartbeats. Messages are sent via an unbounded `tokio::sync::mpsc`
22//! channel to the publish task, which buffers and writes them to Redis
23//! streams. Each background task owns its own Redis connection created on
24//! the Nautilus runtime.
25//!
26//! Handles are stored as `Option<JoinHandle>` for idempotent shutdown via
27//! `close_async()`. The synchronous `close()` uses `block_in_place` to
28//! bridge into the async shutdown path and must be called from outside any
29//! `current_thread` Tokio runtime.
30
31use std::{
32    collections::{HashMap, VecDeque},
33    fmt::Debug,
34    sync::{
35        Arc,
36        atomic::{AtomicBool, Ordering},
37    },
38    time::Duration,
39};
40
41use bytes::Bytes;
42use futures::stream::Stream;
43use nautilus_common::{
44    live::get_runtime,
45    logging::{log_task_error, log_task_started, log_task_stopped},
46    msgbus::{
47        BusMessage,
48        database::{DatabaseConfig, MessageBusConfig, MessageBusDatabaseAdapter},
49        switchboard::CLOSE_TOPIC,
50    },
51};
52use nautilus_core::{
53    UUID4,
54    time::{duration_since_unix_epoch, get_atomic_clock_realtime},
55};
56use nautilus_cryptography::providers::install_cryptographic_provider;
57use nautilus_model::identifiers::TraderId;
58use redis::{AsyncCommands, streams};
59use streams::StreamReadOptions;
60use ustr::Ustr;
61
62use super::{REDIS_MINID, REDIS_XTRIM, await_handle};
63use crate::redis::{create_redis_connection, get_stream_key};
64
65const MSGBUS_PUBLISH: &str = "msgbus-publish";
66const MSGBUS_STREAM: &str = "msgbus-stream";
67const MSGBUS_HEARTBEAT: &str = "msgbus-heartbeat";
68const HEARTBEAT_TOPIC: &str = "health:heartbeat";
69const TRIM_BUFFER_SECS: u64 = 60;
70
71type RedisStreamBulk = Vec<HashMap<String, Vec<HashMap<String, redis::Value>>>>;
72
73#[cfg_attr(
74    feature = "python",
75    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.infrastructure")
76)]
77pub struct RedisMessageBusDatabase {
78    /// The trader ID for this message bus database.
79    pub trader_id: TraderId,
80    /// The instance ID for this message bus database.
81    pub instance_id: UUID4,
82    pub_tx: tokio::sync::mpsc::UnboundedSender<BusMessage>,
83    pub_handle: Option<tokio::task::JoinHandle<()>>,
84    stream_rx: Option<tokio::sync::mpsc::Receiver<BusMessage>>,
85    stream_handle: Option<tokio::task::JoinHandle<()>>,
86    stream_signal: Arc<AtomicBool>,
87    heartbeat_handle: Option<tokio::task::JoinHandle<()>>,
88    heartbeat_signal: Arc<AtomicBool>,
89}
90
91impl Debug for RedisMessageBusDatabase {
92    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93        f.debug_struct(stringify!(RedisMessageBusDatabase))
94            .field("trader_id", &self.trader_id)
95            .field("instance_id", &self.instance_id)
96            .finish()
97    }
98}
99
100impl MessageBusDatabaseAdapter for RedisMessageBusDatabase {
101    type DatabaseType = Self;
102
103    /// Creates a new [`RedisMessageBusDatabase`] instance for the given `trader_id`, `instance_id`, and `config`.
104    ///
105    /// # Errors
106    ///
107    /// Returns an error if:
108    /// - The database configuration is missing in `config`.
109    /// - Establishing the Redis connection for publishing fails.
110    fn new(
111        trader_id: TraderId,
112        instance_id: UUID4,
113        config: MessageBusConfig,
114    ) -> anyhow::Result<Self> {
115        install_cryptographic_provider();
116
117        let config_clone = config.clone();
118        let db_config = config
119            .database
120            .clone()
121            .ok_or_else(|| anyhow::anyhow!("No database config"))?;
122
123        let (pub_tx, pub_rx) = tokio::sync::mpsc::unbounded_channel::<BusMessage>();
124
125        // Create publish task (start the runtime here for now)
126        let pub_handle = Some(get_runtime().spawn(async move {
127            if let Err(e) = publish_messages(pub_rx, trader_id, instance_id, config_clone).await {
128                log_task_error(MSGBUS_PUBLISH, &e);
129            }
130        }));
131
132        // Conditionally create stream task and channel if external streams configured
133        let external_streams = config.external_streams.clone().unwrap_or_default();
134        let stream_signal = Arc::new(AtomicBool::new(false));
135        let (stream_rx, stream_handle) = if external_streams.is_empty() {
136            (None, None)
137        } else {
138            let stream_signal_clone = stream_signal.clone();
139            let (stream_tx, stream_rx) = tokio::sync::mpsc::channel::<BusMessage>(100_000);
140            (
141                Some(stream_rx),
142                Some(get_runtime().spawn(async move {
143                    if let Err(e) =
144                        stream_messages(stream_tx, db_config, external_streams, stream_signal_clone)
145                            .await
146                    {
147                        log_task_error(MSGBUS_STREAM, &e);
148                    }
149                })),
150            )
151        };
152
153        // Create heartbeat task
154        let heartbeat_signal = Arc::new(AtomicBool::new(false));
155        let heartbeat_handle = if let Some(heartbeat_interval_secs) = config.heartbeat_interval_secs
156        {
157            let signal = heartbeat_signal.clone();
158            let pub_tx_clone = pub_tx.clone();
159
160            Some(get_runtime().spawn(async move {
161                run_heartbeat(heartbeat_interval_secs, signal, pub_tx_clone).await;
162            }))
163        } else {
164            None
165        };
166
167        Ok(Self {
168            trader_id,
169            instance_id,
170            pub_tx,
171            pub_handle,
172            stream_rx,
173            stream_handle,
174            stream_signal,
175            heartbeat_handle,
176            heartbeat_signal,
177        })
178    }
179
180    /// Returns whether the message bus database adapter publishing channel is closed.
181    fn is_closed(&self) -> bool {
182        self.pub_tx.is_closed()
183    }
184
185    /// Publishes a message with the given `topic` and `payload`.
186    fn publish(&self, topic: Ustr, payload: Bytes) {
187        let msg = BusMessage::new(topic, payload);
188        if let Err(e) = self.pub_tx.send(msg) {
189            log::error!("Failed to send message: {e}");
190        }
191    }
192
193    /// Closes the message bus database adapter.
194    fn close(&mut self) {
195        log::debug!("Closing");
196
197        self.stream_signal.store(true, Ordering::Relaxed);
198        self.heartbeat_signal.store(true, Ordering::Relaxed);
199
200        if !self.pub_tx.is_closed() {
201            let msg = BusMessage::new_close();
202
203            if let Err(e) = self.pub_tx.send(msg) {
204                log::error!("Failed to send close message: {e:?}");
205            }
206        }
207
208        // Keep close sync for now to avoid async trait method
209        tokio::task::block_in_place(|| {
210            get_runtime().block_on(async {
211                self.close_async().await;
212            });
213        });
214
215        log::debug!("Closed");
216    }
217}
218
219impl RedisMessageBusDatabase {
220    /// Retrieves the Redis stream receiver for this message bus instance.
221    ///
222    /// # Errors
223    ///
224    /// Returns an error if the stream receiver has already been taken.
225    pub fn get_stream_receiver(
226        &mut self,
227    ) -> anyhow::Result<tokio::sync::mpsc::Receiver<BusMessage>> {
228        self.stream_rx
229            .take()
230            .ok_or_else(|| anyhow::anyhow!("Stream receiver already taken"))
231    }
232
233    /// Streams messages arriving on the stream receiver channel.
234    pub fn stream(
235        mut stream_rx: tokio::sync::mpsc::Receiver<BusMessage>,
236    ) -> impl Stream<Item = BusMessage> + 'static {
237        async_stream::stream! {
238            while let Some(msg) = stream_rx.recv().await {
239                yield msg;
240            }
241        }
242    }
243
244    pub async fn close_async(&mut self) {
245        await_handle(self.pub_handle.take(), MSGBUS_PUBLISH).await;
246        await_handle(self.stream_handle.take(), MSGBUS_STREAM).await;
247        await_handle(self.heartbeat_handle.take(), MSGBUS_HEARTBEAT).await;
248    }
249}
250
251/// Publishes messages received on `rx` to Redis streams for the given `trader_id` and `instance_id`, using `config`.
252///
253/// # Errors
254///
255/// Returns an error if:
256/// - The database configuration is missing in `config`.
257/// - Establishing the Redis connection fails.
258/// - Any Redis command fails during publishing.
259pub async fn publish_messages(
260    mut rx: tokio::sync::mpsc::UnboundedReceiver<BusMessage>,
261    trader_id: TraderId,
262    instance_id: UUID4,
263    config: MessageBusConfig,
264) -> anyhow::Result<()> {
265    log_task_started(MSGBUS_PUBLISH);
266
267    let db_config = config
268        .database
269        .as_ref()
270        .ok_or_else(|| anyhow::anyhow!("No database config"))?;
271    let mut con = create_redis_connection(MSGBUS_PUBLISH, db_config.clone()).await?;
272    let stream_key = get_stream_key(trader_id, instance_id, &config);
273
274    // Auto-trimming
275    let autotrim_duration = config
276        .autotrim_mins
277        .filter(|&mins| mins > 0)
278        .map(|mins| Duration::from_secs(u64::from(mins) * 60));
279    let mut last_trim_index: HashMap<String, usize> = HashMap::new();
280
281    // Buffering
282    let mut buffer: VecDeque<BusMessage> = VecDeque::new();
283    let buffer_interval = Duration::from_millis(u64::from(config.buffer_interval_ms.unwrap_or(0)));
284
285    // A sleep used to trigger periodic flushing of the buffer.
286    // When `buffer_interval` is zero we skip using the timer and flush immediately
287    // after every message.
288    let flush_timer = tokio::time::sleep(buffer_interval);
289    tokio::pin!(flush_timer);
290
291    loop {
292        tokio::select! {
293            maybe_msg = rx.recv() => {
294                if let Some(msg) = maybe_msg {
295                    if msg.topic == CLOSE_TOPIC {
296                        log::debug!("Received close message");
297                        // Ensure we exit the loop after flushing any remaining messages.
298                        if !buffer.is_empty() {
299                            drain_buffer(
300                                &mut con,
301                                &stream_key,
302                                config.stream_per_topic,
303                                autotrim_duration,
304                                &mut last_trim_index,
305                                &mut buffer,
306                            ).await?;
307                        }
308                        break;
309                    }
310
311                    buffer.push_back(msg);
312
313                    if buffer_interval.is_zero() {
314                        // Immediate flush mode
315                        drain_buffer(
316                            &mut con,
317                            &stream_key,
318                            config.stream_per_topic,
319                            autotrim_duration,
320                            &mut last_trim_index,
321                            &mut buffer,
322                        ).await?;
323                    }
324                } else {
325                    log::debug!("Channel hung up");
326                    break;
327                }
328            }
329            // Only poll the timer when the interval is non-zero. This avoids
330            // unnecessarily waking the task when immediate flushing is enabled.
331            () = &mut flush_timer, if !buffer_interval.is_zero() => {
332                if !buffer.is_empty() {
333                    drain_buffer(
334                        &mut con,
335                        &stream_key,
336                        config.stream_per_topic,
337                        autotrim_duration,
338                        &mut last_trim_index,
339                        &mut buffer,
340                    ).await?;
341                }
342
343                // Schedule the next tick
344                flush_timer.as_mut().reset(tokio::time::Instant::now() + buffer_interval);
345            }
346        }
347    }
348
349    // Drain any remaining messages
350    if !buffer.is_empty() {
351        drain_buffer(
352            &mut con,
353            &stream_key,
354            config.stream_per_topic,
355            autotrim_duration,
356            &mut last_trim_index,
357            &mut buffer,
358        )
359        .await?;
360    }
361
362    log_task_stopped(MSGBUS_PUBLISH);
363    Ok(())
364}
365
366async fn drain_buffer(
367    conn: &mut redis::aio::ConnectionManager,
368    stream_key: &str,
369    stream_per_topic: bool,
370    autotrim_duration: Option<Duration>,
371    last_trim_index: &mut HashMap<String, usize>,
372    buffer: &mut VecDeque<BusMessage>,
373) -> anyhow::Result<()> {
374    let mut pipe = redis::pipe();
375    pipe.atomic();
376
377    for msg in buffer.drain(..) {
378        let items: Vec<(&str, &[u8])> = vec![
379            ("topic", msg.topic.as_ref()),
380            ("payload", msg.payload.as_ref()),
381        ];
382        let stream_key = if stream_per_topic {
383            format!("{stream_key}:{}", &msg.topic)
384        } else {
385            stream_key.to_string()
386        };
387        pipe.xadd(&stream_key, "*", &items);
388
389        if autotrim_duration.is_none() {
390            continue; // Nothing else to do
391        }
392
393        // Autotrim stream
394        let last_trim_ms = last_trim_index.entry(stream_key.clone()).or_insert(0); // Remove clone
395        let unix_duration_now = duration_since_unix_epoch();
396        let trim_buffer = Duration::from_secs(TRIM_BUFFER_SECS);
397
398        // Improve efficiency of this by batching
399        if *last_trim_ms < (unix_duration_now - trim_buffer).as_millis() as usize {
400            let min_timestamp_ms =
401                (unix_duration_now - autotrim_duration.unwrap()).as_millis() as usize;
402            let result: Result<(), redis::RedisError> = redis::cmd(REDIS_XTRIM)
403                .arg(stream_key.clone())
404                .arg(REDIS_MINID)
405                .arg(min_timestamp_ms)
406                .query_async(conn)
407                .await;
408
409            if let Err(e) = result {
410                log::error!("Error trimming stream '{stream_key}': {e}");
411            } else {
412                last_trim_index.insert(stream_key.clone(), unix_duration_now.as_millis() as usize);
413            }
414        }
415    }
416
417    pipe.query_async(conn).await.map_err(anyhow::Error::from)
418}
419
420/// Streams messages from Redis streams and sends them over the provided `tx` channel.
421///
422/// # Errors
423///
424/// Returns an error if:
425/// - Establishing the Redis connection fails.
426/// - Any Redis read operation fails.
427pub async fn stream_messages(
428    tx: tokio::sync::mpsc::Sender<BusMessage>,
429    config: DatabaseConfig,
430    stream_keys: Vec<String>,
431    stream_signal: Arc<AtomicBool>,
432) -> anyhow::Result<()> {
433    log_task_started(MSGBUS_STREAM);
434
435    let mut con = create_redis_connection(MSGBUS_STREAM, config).await?;
436
437    let stream_keys = &stream_keys
438        .iter()
439        .map(String::as_str)
440        .collect::<Vec<&str>>();
441
442    log::debug!("Listening to streams: [{}]", stream_keys.join(", "));
443
444    // Start streaming from current timestamp
445    let clock = get_atomic_clock_realtime();
446    let timestamp_ms = clock.get_time_ms();
447    let initial_id = timestamp_ms.to_string();
448
449    let mut last_ids: HashMap<String, String> = stream_keys
450        .iter()
451        .map(|&key| (key.to_string(), initial_id.clone()))
452        .collect();
453
454    let opts = StreamReadOptions::default().block(100);
455
456    'outer: loop {
457        if stream_signal.load(Ordering::Relaxed) {
458            log::debug!("Received streaming terminate signal");
459            break;
460        }
461
462        let ids: Vec<String> = stream_keys
463            .iter()
464            .map(|&key| last_ids[key].clone())
465            .collect();
466        let id_refs: Vec<&str> = ids.iter().map(String::as_str).collect();
467
468        let result: Result<RedisStreamBulk, _> =
469            con.xread_options(&[&stream_keys], &[&id_refs], &opts).await;
470        match result {
471            Ok(stream_bulk) => {
472                if stream_bulk.is_empty() {
473                    // Timeout occurred: no messages received
474                    continue;
475                }
476                for entry in &stream_bulk {
477                    for (stream_key, stream_msgs) in entry {
478                        for stream_msg in stream_msgs {
479                            for (id, array) in stream_msg {
480                                last_ids.insert(stream_key.clone(), id.clone());
481
482                                match decode_bus_message(array) {
483                                    Ok(msg) => {
484                                        if let Err(e) = tx.send(msg).await {
485                                            log::debug!("Channel closed: {e:?}");
486                                            break 'outer; // End streaming
487                                        }
488                                    }
489                                    Err(e) => {
490                                        log::error!("{e:?}");
491                                        continue;
492                                    }
493                                }
494                            }
495                        }
496                    }
497                }
498            }
499            Err(e) => {
500                anyhow::bail!("Error reading from stream: {e:?}");
501            }
502        }
503    }
504
505    log_task_stopped(MSGBUS_STREAM);
506    Ok(())
507}
508
509/// Decodes a Redis stream message value into a `BusMessage`.
510///
511/// # Errors
512///
513/// Returns an error if:
514/// - The incoming `stream_msg` is not an array.
515/// - The array has fewer than four elements (invalid format).
516/// - Parsing the topic or payload fails.
517fn decode_bus_message(stream_msg: &redis::Value) -> anyhow::Result<BusMessage> {
518    if let redis::Value::Array(stream_msg) = stream_msg {
519        if stream_msg.len() < 4 {
520            anyhow::bail!("Invalid stream message format: {stream_msg:?}");
521        }
522
523        let topic = match &stream_msg[1] {
524            redis::Value::BulkString(bytes) => match String::from_utf8(bytes.clone()) {
525                Ok(topic) => topic,
526                Err(e) => anyhow::bail!("Error parsing topic: {e}"),
527            },
528            _ => {
529                anyhow::bail!("Invalid topic format: {stream_msg:?}");
530            }
531        };
532
533        let payload = match &stream_msg[3] {
534            redis::Value::BulkString(bytes) => Bytes::copy_from_slice(bytes),
535            _ => {
536                anyhow::bail!("Invalid payload format: {stream_msg:?}");
537            }
538        };
539
540        Ok(BusMessage::with_str_topic(topic, payload))
541    } else {
542        anyhow::bail!("Invalid stream message format: {stream_msg:?}")
543    }
544}
545
546async fn run_heartbeat(
547    heartbeat_interval_secs: u16,
548    signal: Arc<AtomicBool>,
549    pub_tx: tokio::sync::mpsc::UnboundedSender<BusMessage>,
550) {
551    log_task_started("heartbeat");
552    log::debug!("Heartbeat at {heartbeat_interval_secs} second intervals");
553
554    let heartbeat_interval = Duration::from_secs(u64::from(heartbeat_interval_secs));
555    let heartbeat_timer = tokio::time::interval(heartbeat_interval);
556
557    let check_interval = Duration::from_millis(100);
558    let check_timer = tokio::time::interval(check_interval);
559
560    tokio::pin!(heartbeat_timer);
561    tokio::pin!(check_timer);
562
563    loop {
564        if signal.load(Ordering::Relaxed) {
565            log::debug!("Received heartbeat terminate signal");
566            break;
567        }
568
569        tokio::select! {
570            _ = heartbeat_timer.tick() => {
571                let heartbeat = create_heartbeat_msg();
572                if let Err(e) = pub_tx.send(heartbeat) {
573                    // We expect an error if the channel is closed during shutdown
574                    log::debug!("Error sending heartbeat: {e}");
575                }
576            },
577            _ = check_timer.tick() => {}
578        }
579    }
580
581    log_task_stopped("heartbeat");
582}
583
584fn create_heartbeat_msg() -> BusMessage {
585    let payload = Bytes::from(chrono::Utc::now().to_rfc3339().into_bytes());
586    BusMessage::with_str_topic(HEARTBEAT_TOPIC, payload)
587}
588
589#[cfg(test)]
590mod tests {
591    use redis::Value;
592    use rstest::*;
593
594    use super::*;
595
596    #[rstest]
597    fn test_decode_bus_message_valid() {
598        let stream_msg = Value::Array(vec![
599            Value::BulkString(b"0".to_vec()),
600            Value::BulkString(b"topic1".to_vec()),
601            Value::BulkString(b"unused".to_vec()),
602            Value::BulkString(b"data1".to_vec()),
603        ]);
604
605        let result = decode_bus_message(&stream_msg);
606        assert!(result.is_ok());
607        let msg = result.unwrap();
608        assert_eq!(msg.topic, "topic1");
609        assert_eq!(msg.payload, Bytes::from("data1"));
610    }
611
612    #[rstest]
613    fn test_decode_bus_message_missing_fields() {
614        let stream_msg = Value::Array(vec![
615            Value::BulkString(b"0".to_vec()),
616            Value::BulkString(b"topic1".to_vec()),
617        ]);
618
619        let result = decode_bus_message(&stream_msg);
620        assert!(result.is_err());
621        assert_eq!(
622            format!("{}", result.unwrap_err()),
623            "Invalid stream message format: [bulk-string('\"0\"'), bulk-string('\"topic1\"')]"
624        );
625    }
626
627    #[rstest]
628    fn test_decode_bus_message_invalid_topic_format() {
629        let stream_msg = Value::Array(vec![
630            Value::BulkString(b"0".to_vec()),
631            Value::Int(42), // Invalid topic format
632            Value::BulkString(b"unused".to_vec()),
633            Value::BulkString(b"data1".to_vec()),
634        ]);
635
636        let result = decode_bus_message(&stream_msg);
637        assert!(result.is_err());
638        assert_eq!(
639            format!("{}", result.unwrap_err()),
640            "Invalid topic format: [bulk-string('\"0\"'), int(42), bulk-string('\"unused\"'), bulk-string('\"data1\"')]"
641        );
642    }
643
644    #[rstest]
645    fn test_decode_bus_message_invalid_payload_format() {
646        let stream_msg = Value::Array(vec![
647            Value::BulkString(b"0".to_vec()),
648            Value::BulkString(b"topic1".to_vec()),
649            Value::BulkString(b"unused".to_vec()),
650            Value::Int(42), // Invalid payload format
651        ]);
652
653        let result = decode_bus_message(&stream_msg);
654        assert!(result.is_err());
655        assert_eq!(
656            format!("{}", result.unwrap_err()),
657            "Invalid payload format: [bulk-string('\"0\"'), bulk-string('\"topic1\"'), bulk-string('\"unused\"'), int(42)]"
658        );
659    }
660
661    #[rstest]
662    fn test_decode_bus_message_invalid_stream_msg_format() {
663        let stream_msg = Value::BulkString(b"not an array".to_vec());
664
665        let result = decode_bus_message(&stream_msg);
666        assert!(result.is_err());
667        assert_eq!(
668            format!("{}", result.unwrap_err()),
669            "Invalid stream message format: bulk-string('\"not an array\"')"
670        );
671    }
672}
673
674#[cfg(target_os = "linux")] // Run Redis tests on Linux platforms only
675#[cfg(test)]
676mod serial_tests {
677    use nautilus_common::testing::wait_until_async;
678    use redis::aio::ConnectionManager;
679    use rstest::*;
680
681    use super::*;
682    use crate::redis::flush_redis;
683
684    #[fixture]
685    async fn redis_connection() -> ConnectionManager {
686        let config = DatabaseConfig::default();
687        let mut con = create_redis_connection(MSGBUS_STREAM, config)
688            .await
689            .unwrap();
690        flush_redis(&mut con).await.unwrap();
691        con
692    }
693
694    #[rstest]
695    #[tokio::test(flavor = "multi_thread")]
696    async fn test_stream_messages_terminate_signal(#[future] redis_connection: ConnectionManager) {
697        let mut con = redis_connection.await;
698        let (tx, mut rx) = tokio::sync::mpsc::channel::<BusMessage>(100);
699
700        let trader_id = TraderId::from("tester-001");
701        let instance_id = UUID4::new();
702        let config = MessageBusConfig {
703            database: Some(DatabaseConfig::default()),
704            ..Default::default()
705        };
706
707        let stream_key = get_stream_key(trader_id, instance_id, &config);
708        let external_streams = vec![stream_key.clone()];
709        let stream_signal = Arc::new(AtomicBool::new(false));
710        let stream_signal_clone = stream_signal.clone();
711
712        // Start the message streaming task
713        let handle = tokio::spawn(async move {
714            stream_messages(
715                tx,
716                DatabaseConfig::default(),
717                external_streams,
718                stream_signal_clone,
719            )
720            .await
721            .unwrap();
722        });
723
724        stream_signal.store(true, Ordering::Relaxed);
725        let _ = rx.recv().await; // Wait for the tx to close
726
727        // Shutdown and cleanup
728        rx.close();
729        handle.await.unwrap();
730        flush_redis(&mut con).await.unwrap();
731    }
732
733    #[rstest]
734    #[tokio::test(flavor = "multi_thread")]
735    async fn test_stream_messages_when_receiver_closed(
736        #[future] redis_connection: ConnectionManager,
737    ) {
738        let mut con = redis_connection.await;
739        let (tx, mut rx) = tokio::sync::mpsc::channel::<BusMessage>(100);
740
741        let trader_id = TraderId::from("tester-001");
742        let instance_id = UUID4::new();
743        let config = MessageBusConfig {
744            database: Some(DatabaseConfig::default()),
745            ..Default::default()
746        };
747
748        let stream_key = get_stream_key(trader_id, instance_id, &config);
749        let external_streams = vec![stream_key.clone()];
750        let stream_signal = Arc::new(AtomicBool::new(false));
751        let stream_signal_clone = stream_signal.clone();
752
753        // Use a message ID in the future, as streaming begins
754        // around the timestamp the task is spawned.
755        let clock = get_atomic_clock_realtime();
756        let future_id = (clock.get_time_ms() + 1_000_000).to_string();
757
758        // Publish test message
759        let _: () = con
760            .xadd(
761                stream_key,
762                future_id,
763                &[("topic", "topic1"), ("payload", "data1")],
764            )
765            .await
766            .unwrap();
767
768        // Immediately close channel
769        rx.close();
770
771        // Start the message streaming task
772        let handle = tokio::spawn(async move {
773            stream_messages(
774                tx,
775                DatabaseConfig::default(),
776                external_streams,
777                stream_signal_clone,
778            )
779            .await
780            .unwrap();
781        });
782
783        // Shutdown and cleanup
784        handle.await.unwrap();
785        flush_redis(&mut con).await.unwrap();
786    }
787
788    #[rstest]
789    #[tokio::test(flavor = "multi_thread")]
790    async fn test_stream_messages(#[future] redis_connection: ConnectionManager) {
791        let mut con = redis_connection.await;
792        let (tx, mut rx) = tokio::sync::mpsc::channel::<BusMessage>(100);
793
794        let trader_id = TraderId::from("tester-001");
795        let instance_id = UUID4::new();
796        let config = MessageBusConfig {
797            database: Some(DatabaseConfig::default()),
798            ..Default::default()
799        };
800
801        let stream_key = get_stream_key(trader_id, instance_id, &config);
802        let external_streams = vec![stream_key.clone()];
803        let stream_signal = Arc::new(AtomicBool::new(false));
804        let stream_signal_clone = stream_signal.clone();
805
806        // Use a message ID in the future, as streaming begins
807        // around the timestamp the task is spawned.
808        let clock = get_atomic_clock_realtime();
809        let future_id = (clock.get_time_ms() + 1_000_000).to_string();
810
811        // Publish test message
812        let _: () = con
813            .xadd(
814                stream_key,
815                future_id,
816                &[("topic", "topic1"), ("payload", "data1")],
817            )
818            .await
819            .unwrap();
820
821        // Start the message streaming task
822        let handle = tokio::spawn(async move {
823            stream_messages(
824                tx,
825                DatabaseConfig::default(),
826                external_streams,
827                stream_signal_clone,
828            )
829            .await
830            .unwrap();
831        });
832
833        // Receive and verify the message
834        let msg = rx.recv().await.unwrap();
835        assert_eq!(msg.topic, "topic1");
836        assert_eq!(msg.payload, Bytes::from("data1"));
837
838        // Shutdown and cleanup
839        rx.close();
840        stream_signal.store(true, Ordering::Relaxed);
841        handle.await.unwrap();
842        flush_redis(&mut con).await.unwrap();
843    }
844
845    #[rstest]
846    #[tokio::test(flavor = "multi_thread")]
847    async fn test_publish_messages(#[future] redis_connection: ConnectionManager) {
848        let mut con = redis_connection.await;
849        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<BusMessage>();
850
851        let trader_id = TraderId::from("tester-001");
852        let instance_id = UUID4::new();
853        let config = MessageBusConfig {
854            database: Some(DatabaseConfig::default()),
855            stream_per_topic: false,
856            ..Default::default()
857        };
858        let stream_key = get_stream_key(trader_id, instance_id, &config);
859
860        // Start the publish_messages task
861        let handle = tokio::spawn(async move {
862            publish_messages(rx, trader_id, instance_id, config)
863                .await
864                .unwrap();
865        });
866
867        // Send a test message
868        let msg = BusMessage::with_str_topic("test_topic", Bytes::from("test_payload"));
869        tx.send(msg).unwrap();
870
871        // Wait until the message is published to Redis
872        wait_until_async(
873            || {
874                let mut con = con.clone();
875                let stream_key = stream_key.clone();
876                async move {
877                    let messages: RedisStreamBulk =
878                        con.xread(&[&stream_key], &["0"]).await.unwrap();
879                    !messages.is_empty()
880                }
881            },
882            Duration::from_secs(3),
883        )
884        .await;
885
886        // Verify the message was published to Redis
887        let messages: RedisStreamBulk = con.xread(&[&stream_key], &["0"]).await.unwrap();
888        assert_eq!(messages.len(), 1);
889        let stream_msgs = messages[0].get(&stream_key).unwrap();
890        let stream_msg_array = &stream_msgs[0].values().next().unwrap();
891        let decoded_message = decode_bus_message(stream_msg_array).unwrap();
892        assert_eq!(decoded_message.topic, "test_topic");
893        assert_eq!(decoded_message.payload, Bytes::from("test_payload"));
894
895        // Stop publishing task
896        let msg = BusMessage::new_close();
897        tx.send(msg).unwrap();
898
899        // Shutdown and cleanup
900        handle.await.unwrap();
901        flush_redis(&mut con).await.unwrap();
902    }
903
904    #[rstest]
905    #[tokio::test(flavor = "multi_thread")]
906    async fn test_stream_messages_multiple_streams(#[future] redis_connection: ConnectionManager) {
907        let mut con = redis_connection.await;
908        let (tx, mut rx) = tokio::sync::mpsc::channel::<BusMessage>(100);
909
910        // Setup multiple stream keys
911        let stream_key1 = "test:stream:1".to_string();
912        let stream_key2 = "test:stream:2".to_string();
913        let external_streams = vec![stream_key1.clone(), stream_key2.clone()];
914        let stream_signal = Arc::new(AtomicBool::new(false));
915        let stream_signal_clone = stream_signal.clone();
916
917        let clock = get_atomic_clock_realtime();
918        let base_id = clock.get_time_ms() + 1_000_000;
919
920        // Start streaming task
921        let handle = tokio::spawn(async move {
922            stream_messages(
923                tx,
924                DatabaseConfig::default(),
925                external_streams,
926                stream_signal_clone,
927            )
928            .await
929            .unwrap();
930        });
931
932        tokio::time::sleep(Duration::from_millis(200)).await;
933
934        // Publish to stream 1 at higher ID
935        let _: () = con
936            .xadd(
937                &stream_key1,
938                format!("{}", base_id + 100),
939                &[("topic", "stream1-first"), ("payload", "data")],
940            )
941            .await
942            .unwrap();
943
944        let msg = tokio::time::timeout(Duration::from_secs(2), rx.recv())
945            .await
946            .expect("Stream 1 message should be received")
947            .unwrap();
948        assert_eq!(msg.topic, "stream1-first");
949
950        // Publish to stream 2 at lower ID (tests independent cursor tracking)
951        let _: () = con
952            .xadd(
953                &stream_key2,
954                format!("{}", base_id + 50),
955                &[("topic", "stream2-second"), ("payload", "data")],
956            )
957            .await
958            .unwrap();
959
960        let msg = tokio::time::timeout(Duration::from_secs(2), rx.recv())
961            .await
962            .expect("Stream 2 message should be received")
963            .unwrap();
964        assert_eq!(msg.topic, "stream2-second");
965
966        // Shutdown and cleanup
967        rx.close();
968        stream_signal.store(true, Ordering::Relaxed);
969        handle.await.unwrap();
970        flush_redis(&mut con).await.unwrap();
971    }
972
973    #[rstest]
974    #[tokio::test(flavor = "multi_thread")]
975    async fn test_stream_messages_interleaved_at_different_rates(
976        #[future] redis_connection: ConnectionManager,
977    ) {
978        let mut con = redis_connection.await;
979        let (tx, mut rx) = tokio::sync::mpsc::channel::<BusMessage>(100);
980
981        // Setup multiple stream keys
982        let stream_key1 = "test:stream:interleaved:1".to_string();
983        let stream_key2 = "test:stream:interleaved:2".to_string();
984        let stream_key3 = "test:stream:interleaved:3".to_string();
985        let external_streams = vec![
986            stream_key1.clone(),
987            stream_key2.clone(),
988            stream_key3.clone(),
989        ];
990        let stream_signal = Arc::new(AtomicBool::new(false));
991        let stream_signal_clone = stream_signal.clone();
992
993        let clock = get_atomic_clock_realtime();
994        let base_id = clock.get_time_ms() + 1_000_000;
995
996        let handle = tokio::spawn(async move {
997            stream_messages(
998                tx,
999                DatabaseConfig::default(),
1000                external_streams,
1001                stream_signal_clone,
1002            )
1003            .await
1004            .unwrap();
1005        });
1006
1007        tokio::time::sleep(Duration::from_millis(200)).await;
1008
1009        // Stream 1 advances with high ID
1010        let _: () = con
1011            .xadd(
1012                &stream_key1,
1013                format!("{}", base_id + 100),
1014                &[("topic", "s1m1"), ("payload", "data")],
1015            )
1016            .await
1017            .unwrap();
1018        let msg = tokio::time::timeout(Duration::from_secs(2), rx.recv())
1019            .await
1020            .expect("Stream 1 message should be received")
1021            .unwrap();
1022        assert_eq!(msg.topic, "s1m1");
1023
1024        // Stream 2 gets message at lower ID - would be skipped with global cursor
1025        let _: () = con
1026            .xadd(
1027                &stream_key2,
1028                format!("{}", base_id + 50),
1029                &[("topic", "s2m1"), ("payload", "data")],
1030            )
1031            .await
1032            .unwrap();
1033        let msg = tokio::time::timeout(Duration::from_secs(2), rx.recv())
1034            .await
1035            .expect("Stream 2 message should be received")
1036            .unwrap();
1037        assert_eq!(msg.topic, "s2m1");
1038
1039        // Stream 3 gets message at even lower ID
1040        let _: () = con
1041            .xadd(
1042                &stream_key3,
1043                format!("{}", base_id + 25),
1044                &[("topic", "s3m1"), ("payload", "data")],
1045            )
1046            .await
1047            .unwrap();
1048        let msg = tokio::time::timeout(Duration::from_secs(2), rx.recv())
1049            .await
1050            .expect("Stream 3 message should be received")
1051            .unwrap();
1052        assert_eq!(msg.topic, "s3m1");
1053
1054        // Shutdown and cleanup
1055        rx.close();
1056        stream_signal.store(true, Ordering::Relaxed);
1057        handle.await.unwrap();
1058        flush_redis(&mut con).await.unwrap();
1059    }
1060
1061    #[rstest]
1062    #[tokio::test(flavor = "multi_thread")]
1063    async fn test_close() {
1064        let trader_id = TraderId::from("tester-001");
1065        let instance_id = UUID4::new();
1066        let config = MessageBusConfig {
1067            database: Some(DatabaseConfig::default()),
1068            ..Default::default()
1069        };
1070
1071        let mut db = RedisMessageBusDatabase::new(trader_id, instance_id, config).unwrap();
1072
1073        // Close the message bus database (test should not hang)
1074        db.close();
1075    }
1076
1077    #[rstest]
1078    #[tokio::test(flavor = "multi_thread")]
1079    async fn test_heartbeat_task() {
1080        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<BusMessage>();
1081        let signal = Arc::new(AtomicBool::new(false));
1082
1083        // Start the heartbeat task with a short interval
1084        let handle = tokio::spawn(run_heartbeat(1, signal.clone(), tx));
1085
1086        // Wait for a couple of heartbeats
1087        tokio::time::sleep(Duration::from_secs(2)).await;
1088
1089        // Stop the heartbeat task
1090        signal.store(true, Ordering::Relaxed);
1091        handle.await.unwrap();
1092
1093        // Ensure heartbeats were sent
1094        let mut heartbeats: Vec<BusMessage> = Vec::new();
1095        while let Ok(hb) = rx.try_recv() {
1096            heartbeats.push(hb);
1097        }
1098
1099        assert!(!heartbeats.is_empty());
1100
1101        for hb in heartbeats {
1102            assert_eq!(hb.topic, HEARTBEAT_TOPIC);
1103        }
1104    }
1105}