nautilus_common/msgbus/
database.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::fmt::Display;
17
18use bytes::Bytes;
19use nautilus_core::UUID4;
20use nautilus_model::identifiers::TraderId;
21use serde::{Deserialize, Serialize};
22
23use crate::enums::SerializationEncoding;
24
25/// Represents a bus message including a topic and payload.
26#[derive(Clone, Debug, Serialize, Deserialize)]
27#[cfg_attr(
28    feature = "python",
29    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
30)]
31pub struct BusMessage {
32    /// The topic to publish on.
33    pub topic: String,
34    /// The serialized payload for the message.
35    pub payload: Bytes,
36}
37
38impl Display for BusMessage {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        write!(
41            f,
42            "[{}] {}",
43            self.topic,
44            String::from_utf8_lossy(&self.payload)
45        )
46    }
47}
48
49/// Configuration for database connections.
50///
51/// # Notes
52///
53/// If `database_type` is `"redis"`, it requires Redis version 6.2 or higher for correct operation.
54#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
55#[serde(default)]
56pub struct DatabaseConfig {
57    /// The database type.
58    #[serde(alias = "type")]
59    pub database_type: String,
60    /// The database host address. If `None`, the typical default should be used.
61    pub host: Option<String>,
62    /// The database port. If `None`, the typical default should be used.
63    pub port: Option<u16>,
64    /// The account username for the database connection.
65    pub username: Option<String>,
66    /// The account password for the database connection.
67    pub password: Option<String>,
68    /// If the database should use an SSL-enabled connection.
69    pub ssl: bool,
70    /// The timeout (in seconds) to wait for a new connection.
71    pub timeout: u16,
72}
73
74impl Default for DatabaseConfig {
75    /// Creates a new default [`DatabaseConfig`] instance.
76    fn default() -> Self {
77        Self {
78            database_type: "redis".to_string(),
79            host: None,
80            port: None,
81            username: None,
82            password: None,
83            ssl: false,
84            timeout: 20,
85        }
86    }
87}
88
89/// Configuration for `MessageBus` instances.
90#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
91#[serde(default)]
92pub struct MessageBusConfig {
93    /// The configuration for the message bus backing database.
94    pub database: Option<DatabaseConfig>,
95    /// The encoding for database operations, controls the type of serializer used.
96    pub encoding: SerializationEncoding,
97    /// If timestamps should be persisted as ISO 8601 strings.
98    /// If `false`, then timestamps will be persisted as UNIX nanoseconds.
99    pub timestamps_as_iso8601: bool,
100    /// The buffer interval (milliseconds) between pipelined/batched transactions.
101    /// The recommended range if using buffered pipelining is [10, 1000] milliseconds,
102    /// with a good compromise being 100 milliseconds.
103    pub buffer_interval_ms: Option<u32>,
104    /// The lookback window in minutes for automatic stream trimming.
105    /// The actual window may extend up to one minute beyond the specified value since streams are trimmed at most once every minute.
106    /// This feature requires Redis version 6.2 or higher; otherwise, it will result in a command syntax error.
107    pub autotrim_mins: Option<u32>,
108    /// If a 'trader-' prefix is used for stream names.
109    pub use_trader_prefix: bool,
110    /// If the trader's ID is used for stream names.
111    pub use_trader_id: bool,
112    /// If the trader's instance ID is used for stream names. Default is `false`.
113    pub use_instance_id: bool,
114    /// The prefix for externally published stream names. Must have a `database` config.
115    pub streams_prefix: String,
116    /// If `true`, messages will be written to separate streams per topic.
117    /// If `false`, all messages will be written to the same stream.
118    pub stream_per_topic: bool,
119    /// The external stream keys the message bus will listen to for publishing deserialized message payloads internally.
120    pub external_streams: Option<Vec<String>>,
121    /// A list of serializable types **not** to publish externally.
122    pub types_filter: Option<Vec<String>>,
123    /// The heartbeat interval (seconds).
124    pub heartbeat_interval_secs: Option<u16>,
125}
126
127impl Default for MessageBusConfig {
128    /// Creates a new default [`MessageBusConfig`] instance.
129    fn default() -> Self {
130        Self {
131            database: None,
132            encoding: SerializationEncoding::MsgPack,
133            timestamps_as_iso8601: false,
134            buffer_interval_ms: None,
135            autotrim_mins: None,
136            use_trader_prefix: true,
137            use_trader_id: true,
138            use_instance_id: false,
139            streams_prefix: "stream".to_string(),
140            stream_per_topic: true,
141            external_streams: None,
142            types_filter: None,
143            heartbeat_interval_secs: None,
144        }
145    }
146}
147
148/// A generic message bus database facade.
149///
150/// The main operations take a consistent `key` and `payload` which should provide enough
151/// information to implement the message bus database in many different technologies.
152///
153/// Delete operations may need a `payload` to target specific values.
154pub trait MessageBusDatabaseAdapter {
155    type DatabaseType;
156
157    fn new(
158        trader_id: TraderId,
159        instance_id: UUID4,
160        config: MessageBusConfig,
161    ) -> anyhow::Result<Self::DatabaseType>;
162    fn is_closed(&self) -> bool;
163    fn publish(&self, topic: String, payload: Bytes);
164    fn close(&mut self);
165}
166
167////////////////////////////////////////////////////////////////////////////////
168// Tests
169////////////////////////////////////////////////////////////////////////////////
170#[cfg(test)]
171mod tests {
172    use rstest::*;
173    use serde_json::json;
174
175    use super::*;
176
177    #[rstest]
178    fn test_default_database_config() {
179        let config = DatabaseConfig::default();
180        assert_eq!(config.database_type, "redis");
181        assert_eq!(config.host, None);
182        assert_eq!(config.port, None);
183        assert_eq!(config.username, None);
184        assert_eq!(config.password, None);
185        assert!(!config.ssl);
186        assert_eq!(config.timeout, 20);
187    }
188
189    #[rstest]
190    fn test_deserialize_database_config() {
191        let config_json = json!({
192            "type": "redis",
193            "host": "localhost",
194            "port": 6379,
195            "username": "user",
196            "password": "pass",
197            "ssl": true,
198            "timeout": 30
199        });
200        let config: DatabaseConfig = serde_json::from_value(config_json).unwrap();
201        assert_eq!(config.database_type, "redis");
202        assert_eq!(config.host, Some("localhost".to_string()));
203        assert_eq!(config.port, Some(6379));
204        assert_eq!(config.username, Some("user".to_string()));
205        assert_eq!(config.password, Some("pass".to_string()));
206        assert!(config.ssl);
207        assert_eq!(config.timeout, 30);
208    }
209
210    #[rstest]
211    fn test_default_message_bus_config() {
212        let config = MessageBusConfig::default();
213        assert_eq!(config.encoding, SerializationEncoding::MsgPack);
214        assert!(!config.timestamps_as_iso8601);
215        assert_eq!(config.buffer_interval_ms, None);
216        assert_eq!(config.autotrim_mins, None);
217        assert!(config.use_trader_prefix);
218        assert!(config.use_trader_id);
219        assert!(!config.use_instance_id);
220        assert_eq!(config.streams_prefix, "stream");
221        assert!(config.stream_per_topic);
222        assert_eq!(config.external_streams, None);
223        assert_eq!(config.types_filter, None);
224    }
225
226    #[test]
227    fn test_deserialize_message_bus_config() {
228        let config_json = json!({
229            "database": {
230                "type": "redis",
231                "host": "localhost",
232                "port": 6379,
233                "username": "user",
234                "password": "pass",
235                "ssl": true,
236                "timeout": 30
237            },
238            "encoding": "json",
239            "timestamps_as_iso8601": true,
240            "buffer_interval_ms": 100,
241            "autotrim_mins": 60,
242            "use_trader_prefix": false,
243            "use_trader_id": false,
244            "use_instance_id": true,
245            "streams_prefix": "data_streams",
246            "stream_per_topic": false,
247            "external_streams": ["stream1", "stream2"],
248            "types_filter": ["type1", "type2"]
249        });
250        let config: MessageBusConfig = serde_json::from_value(config_json).unwrap();
251        assert_eq!(config.encoding, SerializationEncoding::Json);
252        assert!(config.timestamps_as_iso8601);
253        assert_eq!(config.buffer_interval_ms, Some(100));
254        assert_eq!(config.autotrim_mins, Some(60));
255        assert!(!config.use_trader_prefix);
256        assert!(!config.use_trader_id);
257        assert!(config.use_instance_id);
258        assert_eq!(config.streams_prefix, "data_streams");
259        assert!(!config.stream_per_topic);
260        assert_eq!(
261            config.external_streams,
262            Some(vec!["stream1".to_string(), "stream2".to_string()])
263        );
264        assert_eq!(
265            config.types_filter,
266            Some(vec!["type1".to_string(), "type2".to_string()])
267        );
268    }
269}