nautilus_common/msgbus/
database.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::fmt::Display;
17
18use bytes::Bytes;
19use nautilus_core::UUID4;
20use nautilus_model::identifiers::TraderId;
21use serde::{Deserialize, Serialize};
22
23use crate::enums::SerializationEncoding;
24
25/// Represents a bus message including a topic and payload.
26#[derive(Clone, Debug, Serialize, Deserialize)]
27#[cfg_attr(
28    feature = "python",
29    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
30)]
31pub struct BusMessage {
32    /// The topic to publish on.
33    pub topic: String,
34    /// The serialized payload for the message.
35    pub payload: Bytes,
36}
37
38impl Display for BusMessage {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        write!(
41            f,
42            "[{}] {}",
43            self.topic,
44            String::from_utf8_lossy(&self.payload)
45        )
46    }
47}
48
49/// Configuration for database connections.
50///
51/// # Notes
52///
53/// If `database_type` is `"redis"`, it requires Redis version 6.2 or higher for correct operation.
54#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
55#[serde(default)]
56pub struct DatabaseConfig {
57    /// The database type.
58    #[serde(alias = "type")]
59    pub database_type: String,
60    /// The database host address. If `None`, the typical default should be used.
61    pub host: Option<String>,
62    /// The database port. If `None`, the typical default should be used.
63    pub port: Option<u16>,
64    /// The account username for the database connection.
65    pub username: Option<String>,
66    /// The account password for the database connection.
67    pub password: Option<String>,
68    /// If the database should use an SSL-enabled connection.
69    pub ssl: bool,
70    /// The timeout (in seconds) to wait for a new connection.
71    pub connection_timeout: u16,
72    /// The timeout (in seconds) to wait for a response.
73    pub response_timeout: u16,
74    /// The number of retry attempts with exponential backoff for connection attempts.
75    pub number_of_retries: usize,
76    /// The base value for exponential backoff calculation.
77    pub exponent_base: u64,
78    /// The maximum delay between retry attempts (in seconds).
79    pub max_delay: u64,
80    /// The multiplication factor for retry delay calculation.
81    pub factor: u64,
82}
83
84impl Default for DatabaseConfig {
85    /// Creates a new default [`DatabaseConfig`] instance.
86    fn default() -> Self {
87        Self {
88            database_type: "redis".to_string(),
89            host: None,
90            port: None,
91            username: None,
92            password: None,
93            ssl: false,
94            connection_timeout: 20,
95            response_timeout: 20,
96            number_of_retries: 100,
97            exponent_base: 2,
98            max_delay: 1000,
99            factor: 2,
100        }
101    }
102}
103
104/// Configuration for `MessageBus` instances.
105#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
106#[serde(default)]
107pub struct MessageBusConfig {
108    /// The configuration for the message bus backing database.
109    pub database: Option<DatabaseConfig>,
110    /// The encoding for database operations, controls the type of serializer used.
111    pub encoding: SerializationEncoding,
112    /// If timestamps should be persisted as ISO 8601 strings.
113    /// If `false`, then timestamps will be persisted as UNIX nanoseconds.
114    pub timestamps_as_iso8601: bool,
115    /// The buffer interval (milliseconds) between pipelined/batched transactions.
116    /// The recommended range if using buffered pipelining is [10, 1000] milliseconds,
117    /// with a good compromise being 100 milliseconds.
118    pub buffer_interval_ms: Option<u32>,
119    /// The lookback window in minutes for automatic stream trimming.
120    /// The actual window may extend up to one minute beyond the specified value since streams are trimmed at most once every minute.
121    /// This feature requires Redis version 6.2 or higher; otherwise, it will result in a command syntax error.
122    pub autotrim_mins: Option<u32>,
123    /// If a 'trader-' prefix is used for stream names.
124    pub use_trader_prefix: bool,
125    /// If the trader's ID is used for stream names.
126    pub use_trader_id: bool,
127    /// If the trader's instance ID is used for stream names. Default is `false`.
128    pub use_instance_id: bool,
129    /// The prefix for externally published stream names. Must have a `database` config.
130    pub streams_prefix: String,
131    /// If `true`, messages will be written to separate streams per topic.
132    /// If `false`, all messages will be written to the same stream.
133    pub stream_per_topic: bool,
134    /// The external stream keys the message bus will listen to for publishing deserialized message payloads internally.
135    pub external_streams: Option<Vec<String>>,
136    /// A list of serializable types **not** to publish externally.
137    pub types_filter: Option<Vec<String>>,
138    /// The heartbeat interval (seconds).
139    pub heartbeat_interval_secs: Option<u16>,
140}
141
142impl Default for MessageBusConfig {
143    /// Creates a new default [`MessageBusConfig`] instance.
144    fn default() -> Self {
145        Self {
146            database: None,
147            encoding: SerializationEncoding::MsgPack,
148            timestamps_as_iso8601: false,
149            buffer_interval_ms: None,
150            autotrim_mins: None,
151            use_trader_prefix: true,
152            use_trader_id: true,
153            use_instance_id: false,
154            streams_prefix: "stream".to_string(),
155            stream_per_topic: true,
156            external_streams: None,
157            types_filter: None,
158            heartbeat_interval_secs: None,
159        }
160    }
161}
162
163/// A generic message bus database facade.
164///
165/// The main operations take a consistent `key` and `payload` which should provide enough
166/// information to implement the message bus database in many different technologies.
167///
168/// Delete operations may need a `payload` to target specific values.
169pub trait MessageBusDatabaseAdapter {
170    type DatabaseType;
171
172    fn new(
173        trader_id: TraderId,
174        instance_id: UUID4,
175        config: MessageBusConfig,
176    ) -> anyhow::Result<Self::DatabaseType>;
177    fn is_closed(&self) -> bool;
178    fn publish(&self, topic: String, payload: Bytes);
179    fn close(&mut self);
180}
181
182////////////////////////////////////////////////////////////////////////////////
183// Tests
184////////////////////////////////////////////////////////////////////////////////
185#[cfg(test)]
186mod tests {
187    use rstest::*;
188    use serde_json::json;
189
190    use super::*;
191
192    #[rstest]
193    fn test_default_database_config() {
194        let config = DatabaseConfig::default();
195        assert_eq!(config.database_type, "redis");
196        assert_eq!(config.host, None);
197        assert_eq!(config.port, None);
198        assert_eq!(config.username, None);
199        assert_eq!(config.password, None);
200        assert!(!config.ssl);
201        assert_eq!(config.connection_timeout, 20);
202        assert_eq!(config.response_timeout, 20);
203        assert_eq!(config.number_of_retries, 100);
204        assert_eq!(config.exponent_base, 2);
205        assert_eq!(config.max_delay, 1000);
206        assert_eq!(config.factor, 2);
207    }
208
209    #[rstest]
210    fn test_deserialize_database_config() {
211        let config_json = json!({
212            "type": "redis",
213            "host": "localhost",
214            "port": 6379,
215            "username": "user",
216            "password": "pass",
217            "ssl": true,
218            "connection_timeout": 30,
219            "response_timeout": 10,
220            "number_of_retries": 3,
221            "exponent_base": 2,
222            "max_delay": 10,
223            "factor": 2
224        });
225        let config: DatabaseConfig = serde_json::from_value(config_json).unwrap();
226        assert_eq!(config.database_type, "redis");
227        assert_eq!(config.host, Some("localhost".to_string()));
228        assert_eq!(config.port, Some(6379));
229        assert_eq!(config.username, Some("user".to_string()));
230        assert_eq!(config.password, Some("pass".to_string()));
231        assert!(config.ssl);
232        assert_eq!(config.connection_timeout, 30);
233        assert_eq!(config.response_timeout, 10);
234        assert_eq!(config.number_of_retries, 3);
235        assert_eq!(config.exponent_base, 2);
236        assert_eq!(config.max_delay, 10);
237        assert_eq!(config.factor, 2);
238    }
239
240    #[rstest]
241    fn test_default_message_bus_config() {
242        let config = MessageBusConfig::default();
243        assert_eq!(config.encoding, SerializationEncoding::MsgPack);
244        assert!(!config.timestamps_as_iso8601);
245        assert_eq!(config.buffer_interval_ms, None);
246        assert_eq!(config.autotrim_mins, None);
247        assert!(config.use_trader_prefix);
248        assert!(config.use_trader_id);
249        assert!(!config.use_instance_id);
250        assert_eq!(config.streams_prefix, "stream");
251        assert!(config.stream_per_topic);
252        assert_eq!(config.external_streams, None);
253        assert_eq!(config.types_filter, None);
254    }
255
256    #[test]
257    fn test_deserialize_message_bus_config() {
258        let config_json = json!({
259            "database": {
260                "type": "redis",
261                "host": "localhost",
262                "port": 6379,
263                "username": "user",
264                "password": "pass",
265                "ssl": true,
266                "connection_timeout": 30,
267                "response_timeout": 10,
268                "number_of_retries": 3,
269                "exponent_base": 2,
270                "max_delay": 10,
271                "factor": 2
272            },
273            "encoding": "json",
274            "timestamps_as_iso8601": true,
275            "buffer_interval_ms": 100,
276            "autotrim_mins": 60,
277            "use_trader_prefix": false,
278            "use_trader_id": false,
279            "use_instance_id": true,
280            "streams_prefix": "data_streams",
281            "stream_per_topic": false,
282            "external_streams": ["stream1", "stream2"],
283            "types_filter": ["type1", "type2"]
284        });
285        let config: MessageBusConfig = serde_json::from_value(config_json).unwrap();
286        assert_eq!(config.encoding, SerializationEncoding::Json);
287        assert!(config.timestamps_as_iso8601);
288        assert_eq!(config.buffer_interval_ms, Some(100));
289        assert_eq!(config.autotrim_mins, Some(60));
290        assert!(!config.use_trader_prefix);
291        assert!(!config.use_trader_id);
292        assert!(config.use_instance_id);
293        assert_eq!(config.streams_prefix, "data_streams");
294        assert!(!config.stream_per_topic);
295        assert_eq!(
296            config.external_streams,
297            Some(vec!["stream1".to_string(), "stream2".to_string()])
298        );
299        assert_eq!(
300            config.types_filter,
301            Some(vec!["type1".to_string(), "type2".to_string()])
302        );
303    }
304}