1use std::fmt::Display;
17
18use bytes::Bytes;
19use nautilus_core::UUID4;
20use nautilus_model::identifiers::TraderId;
21use serde::{Deserialize, Serialize};
22
23use crate::enums::SerializationEncoding;
24
25#[derive(Clone, Debug, Serialize, Deserialize)]
27#[cfg_attr(
28 feature = "python",
29 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
30)]
31pub struct BusMessage {
32 pub topic: String,
34 pub payload: Bytes,
36}
37
38impl Display for BusMessage {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 write!(
41 f,
42 "[{}] {}",
43 self.topic,
44 String::from_utf8_lossy(&self.payload)
45 )
46 }
47}
48
49#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
55#[serde(default)]
56pub struct DatabaseConfig {
57 #[serde(alias = "type")]
59 pub database_type: String,
60 pub host: Option<String>,
62 pub port: Option<u16>,
64 pub username: Option<String>,
66 pub password: Option<String>,
68 pub ssl: bool,
70 pub connection_timeout: u16,
72 pub response_timeout: u16,
74 pub number_of_retries: usize,
76 pub exponent_base: u64,
78 pub max_delay: u64,
80 pub factor: u64,
82}
83
84impl Default for DatabaseConfig {
85 fn default() -> Self {
87 Self {
88 database_type: "redis".to_string(),
89 host: None,
90 port: None,
91 username: None,
92 password: None,
93 ssl: false,
94 connection_timeout: 20,
95 response_timeout: 20,
96 number_of_retries: 100,
97 exponent_base: 2,
98 max_delay: 1000,
99 factor: 2,
100 }
101 }
102}
103
104#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
106#[serde(default)]
107pub struct MessageBusConfig {
108 pub database: Option<DatabaseConfig>,
110 pub encoding: SerializationEncoding,
112 pub timestamps_as_iso8601: bool,
115 pub buffer_interval_ms: Option<u32>,
119 pub autotrim_mins: Option<u32>,
123 pub use_trader_prefix: bool,
125 pub use_trader_id: bool,
127 pub use_instance_id: bool,
129 pub streams_prefix: String,
131 pub stream_per_topic: bool,
134 pub external_streams: Option<Vec<String>>,
136 pub types_filter: Option<Vec<String>>,
138 pub heartbeat_interval_secs: Option<u16>,
140}
141
142impl Default for MessageBusConfig {
143 fn default() -> Self {
145 Self {
146 database: None,
147 encoding: SerializationEncoding::MsgPack,
148 timestamps_as_iso8601: false,
149 buffer_interval_ms: None,
150 autotrim_mins: None,
151 use_trader_prefix: true,
152 use_trader_id: true,
153 use_instance_id: false,
154 streams_prefix: "stream".to_string(),
155 stream_per_topic: true,
156 external_streams: None,
157 types_filter: None,
158 heartbeat_interval_secs: None,
159 }
160 }
161}
162
163pub trait MessageBusDatabaseAdapter {
170 type DatabaseType;
171
172 fn new(
173 trader_id: TraderId,
174 instance_id: UUID4,
175 config: MessageBusConfig,
176 ) -> anyhow::Result<Self::DatabaseType>;
177 fn is_closed(&self) -> bool;
178 fn publish(&self, topic: String, payload: Bytes);
179 fn close(&mut self);
180}
181
182#[cfg(test)]
186mod tests {
187 use rstest::*;
188 use serde_json::json;
189
190 use super::*;
191
192 #[rstest]
193 fn test_default_database_config() {
194 let config = DatabaseConfig::default();
195 assert_eq!(config.database_type, "redis");
196 assert_eq!(config.host, None);
197 assert_eq!(config.port, None);
198 assert_eq!(config.username, None);
199 assert_eq!(config.password, None);
200 assert!(!config.ssl);
201 assert_eq!(config.connection_timeout, 20);
202 assert_eq!(config.response_timeout, 20);
203 assert_eq!(config.number_of_retries, 100);
204 assert_eq!(config.exponent_base, 2);
205 assert_eq!(config.max_delay, 1000);
206 assert_eq!(config.factor, 2);
207 }
208
209 #[rstest]
210 fn test_deserialize_database_config() {
211 let config_json = json!({
212 "type": "redis",
213 "host": "localhost",
214 "port": 6379,
215 "username": "user",
216 "password": "pass",
217 "ssl": true,
218 "connection_timeout": 30,
219 "response_timeout": 10,
220 "number_of_retries": 3,
221 "exponent_base": 2,
222 "max_delay": 10,
223 "factor": 2
224 });
225 let config: DatabaseConfig = serde_json::from_value(config_json).unwrap();
226 assert_eq!(config.database_type, "redis");
227 assert_eq!(config.host, Some("localhost".to_string()));
228 assert_eq!(config.port, Some(6379));
229 assert_eq!(config.username, Some("user".to_string()));
230 assert_eq!(config.password, Some("pass".to_string()));
231 assert!(config.ssl);
232 assert_eq!(config.connection_timeout, 30);
233 assert_eq!(config.response_timeout, 10);
234 assert_eq!(config.number_of_retries, 3);
235 assert_eq!(config.exponent_base, 2);
236 assert_eq!(config.max_delay, 10);
237 assert_eq!(config.factor, 2);
238 }
239
240 #[rstest]
241 fn test_default_message_bus_config() {
242 let config = MessageBusConfig::default();
243 assert_eq!(config.encoding, SerializationEncoding::MsgPack);
244 assert!(!config.timestamps_as_iso8601);
245 assert_eq!(config.buffer_interval_ms, None);
246 assert_eq!(config.autotrim_mins, None);
247 assert!(config.use_trader_prefix);
248 assert!(config.use_trader_id);
249 assert!(!config.use_instance_id);
250 assert_eq!(config.streams_prefix, "stream");
251 assert!(config.stream_per_topic);
252 assert_eq!(config.external_streams, None);
253 assert_eq!(config.types_filter, None);
254 }
255
256 #[test]
257 fn test_deserialize_message_bus_config() {
258 let config_json = json!({
259 "database": {
260 "type": "redis",
261 "host": "localhost",
262 "port": 6379,
263 "username": "user",
264 "password": "pass",
265 "ssl": true,
266 "connection_timeout": 30,
267 "response_timeout": 10,
268 "number_of_retries": 3,
269 "exponent_base": 2,
270 "max_delay": 10,
271 "factor": 2
272 },
273 "encoding": "json",
274 "timestamps_as_iso8601": true,
275 "buffer_interval_ms": 100,
276 "autotrim_mins": 60,
277 "use_trader_prefix": false,
278 "use_trader_id": false,
279 "use_instance_id": true,
280 "streams_prefix": "data_streams",
281 "stream_per_topic": false,
282 "external_streams": ["stream1", "stream2"],
283 "types_filter": ["type1", "type2"]
284 });
285 let config: MessageBusConfig = serde_json::from_value(config_json).unwrap();
286 assert_eq!(config.encoding, SerializationEncoding::Json);
287 assert!(config.timestamps_as_iso8601);
288 assert_eq!(config.buffer_interval_ms, Some(100));
289 assert_eq!(config.autotrim_mins, Some(60));
290 assert!(!config.use_trader_prefix);
291 assert!(!config.use_trader_id);
292 assert!(config.use_instance_id);
293 assert_eq!(config.streams_prefix, "data_streams");
294 assert!(!config.stream_per_topic);
295 assert_eq!(
296 config.external_streams,
297 Some(vec!["stream1".to_string(), "stream2".to_string()])
298 );
299 assert_eq!(
300 config.types_filter,
301 Some(vec!["type1".to_string(), "type2".to_string()])
302 );
303 }
304}